Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type BASIC_RESOURCE = CRITERIA & {
patientAge?: PATIENT_AGE, // The age constraint of the patient
dateRangeList?: Array<DATE_RANGE>, // The date range constraint of the resource
encounterDateRange?: DATE_RANGE // The date range constraint of the related encounter
uniqueFields?: Array<UNIQUE_FIELD> // The unique fields of the resource to count the patients
}

type PATIENT_AGE = {
Expand All @@ -144,6 +145,12 @@ type DATE_RANGE = {
dateIsNotNull?: boolean
}

type UNIQUE_FIELD = {
name?: string, // The fhir resource field to verify, for now only codes are supported and it is not need to fill this field which is ignored
operator: string, // The operator to use for the verification
n: number // The number of unique values
}

type TEMPORAL_CONSTRAINT = {
idList: string | Array<number>,
constraintType: "sameEncounter" | "differentEncounter" | "directChronologicalOrdering" | "sameEpisodeOfCare",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,42 @@ class QueryBuilderBasicResource(val querySolver: ResourceResolver) {
} else criterionDataFrame
}

private def filterByUniqueCodes(criterionDataFrame: DataFrame,
basicResource: BasicResource,
criterionId: Short): DataFrame = {
if (basicResource.uniqueFields.isDefined) {
val codes = basicResource.uniqueFields.get
val codeColumn = QueryColumn.CODE
val subjectColumn = QueryBuilderUtils.getSubjectColumn(criterionId)
var filterDataframe: Option[DataFrame] = None
for (code <- codes) {
val n = code.n
var operator = code.operator
if (operator != ">=" || n != 1) {
operator = if (operator == "=") "==" else operator
val groupByColumns = ListBuffer[String](QueryBuilderUtils.getSubjectColumn(criterionId), QueryBuilderUtils.buildColName(criterionId, codeColumn))
val filterPatientDataFrame: DataFrame = criterionDataFrame
.groupBy(groupByColumns.head, groupByColumns.tail.toList: _*)
.count()
.filter(s"count $operator $n")
.drop("count")
if (filterDataframe.isEmpty) {
filterDataframe = Some(filterPatientDataFrame)
} else {
filterDataframe = Some(filterDataframe.get.join(filterPatientDataFrame))
}
}
}
if (filterDataframe.isDefined) {
val filterPatientDataFrame = filterDataframe.get
return criterionDataFrame.join(filterPatientDataFrame,
criterionDataFrame(subjectColumn) <=> filterPatientDataFrame(subjectColumn),
"left_semi")
}
}
criterionDataFrame
}

/** Filter patient of input dataframe which does not have the required amount of occurrence.
*
* @param criterionDataFrame resulting dataframe of patient of a basicResource
Expand Down Expand Up @@ -268,10 +304,11 @@ class QueryBuilderBasicResource(val querySolver: ResourceResolver) {
basicResource,
criterionId,
isInTemporalConstraint)
criterionDataFrame = filterByUniqueCodes(criterionDataFrame, basicResource, criterionId)
criterionDataFrame = qbUtils.cleanDataFrame(criterionDataFrame,
isInTemporalConstraint,
selectedColumns,
subjectColumn)
isInTemporalConstraint,
selectedColumns,
subjectColumn)

if (logger.isDebugEnabled) {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ abstract class BaseQuery(val _type: String, _id: Short, isInclusive: Boolean) {
val IsInclusive: Boolean = isInclusive
}

case class UniqueFieldConstraint(
name: Option[String],
operator: String,
n: Int
)

case class Occurrence(n: Int,
operator: String,
sameEncounter: Option[Boolean],
Expand All @@ -35,6 +41,7 @@ case class BasicResource(_id: Short,
occurrence: Option[Occurrence] = None,
patientAge: Option[PatientAge] = None,
encounterDateRange: Option[DateRange] = None,
uniqueFields: Option[List[UniqueFieldConstraint]] = None,
nullAvailableFieldList: Option[List[String]] = None)
extends BaseQuery("basic_resource", _id, isInclusive) {
override def toString: String = getClass.getName + "@" + Integer.toHexString(hashCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,15 @@ class CriterionTagsParser(val queryBuilderConfigs: ResourceConfig) {
} else solrFieldList
}

def getCodeFieldList(solrFieldList: List[String]): List[String] = {
if (genericQuery.uniqueFields.isDefined && queryBuilderConfigs
.requestKeyPerCollectionMap(collection)
.contains(QueryColumn.CODE)) {
(solrFieldList ++ queryBuilderConfigs.requestKeyPerCollectionMap(collection)(
QueryColumn.CODE)).distinct
} else solrFieldList
}

def getIsDateTimeAvailable: Boolean = {
val colsMapping = queryBuilderConfigs
.requestKeyPerCollectionMap(collection)
Expand All @@ -306,6 +315,7 @@ class CriterionTagsParser(val queryBuilderConfigs: ResourceConfig) {
requiredSolrFieldList = getResourceGroupByFieldList(requiredSolrFieldList)
requiredSolrFieldList =
convertDatePreferenceToDateTimeSolrField(requiredSolrFieldList, collection)
requiredSolrFieldList = getCodeFieldList(requiredSolrFieldList)
val isDateTimeAvailable: Boolean = getIsDateTimeAvailable
val isEncounterAvailable: Boolean = getIsEncounterAvailable
val isEpisodeOfCareAvailable: Boolean = getIsEpisodeOfCareAvailable
Expand Down Expand Up @@ -353,10 +363,10 @@ class CriterionTagsParser(val queryBuilderConfigs: ResourceConfig) {
val isDateTimeAvailable: Boolean = getIsDateTimeAvailable
val isEncounterAvailable: Boolean = getIsEncounterAvailable
CriterionTags(isDateTimeAvailable,
isEncounterAvailable,
getIsEpisodeOfCareAvailable,
isInTemporalConstraint = false,
withOrganizations = requestOrganization)
isEncounterAvailable,
getIsEpisodeOfCareAvailable,
isInTemporalConstraint = false,
withOrganizations = requestOrganization)
}

private def convertDatePreferenceToDateTimeSolrField(datePreferenceList: List[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ object QueryParser {
temporalConstraints: Option[List[GenericTemporalConstraint]],
criteria: Option[List[GenericQuery]],
nAmongMOptions: Option[Occurrence],
uniqueFields: Option[List[UniqueFieldConstraint]],
version: Option[String],
sourcePopulation: Option[SourcePopulationDTO],
request: Option[GenericQuery],
Expand All @@ -153,6 +154,7 @@ object QueryParser {
Json.reads[GenericTemporalConstraint]
implicit lazy val sourcePopulationReads = Json.reads[SourcePopulationDTO]
implicit lazy val dateRange = Json.reads[DateRange]
implicit lazy val uniqueFieldConstraintReads = Json.reads[UniqueFieldConstraint]
implicit lazy val queryRead = Json.reads[GenericQuery]
logger.info(s"Trying to parse query ${cohortDefinitionSyntaxJsonString}")
val cohortRequestOption =
Expand Down Expand Up @@ -256,6 +258,7 @@ object QueryParser {
else genericQuery.filterFhir.get,
occurrence = genericQuery.occurrence,
patientAge = genericQuery.patientAge,
uniqueFields = genericQuery.uniqueFields,
nullAvailableFieldList = genericQuery.nullAvailableFieldList,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package fr.aphp.id.eds.requester.query.resolver.rest
import fr.aphp.id.eds.requester.query.resolver.ResourceConfig
import fr.aphp.id.eds.requester.{FhirResource, QueryColumn}
import org.hl7.fhir.instance.model.api.IBase
import org.hl7.fhir.r4.model.{DateTimeType, DateType, IdType, Reference}
import org.hl7.fhir.r4.model._

case class QueryColumnMapping(queryColName: String,
fhirPath: String,
Expand Down Expand Up @@ -38,21 +38,21 @@ class RestFhirQueryElementsConfig extends ResourceConfig {
QueryColumnMapping(QueryColumn.ENCOUNTER_END_DATE, "period.end", classOf[DateTimeType])),
)),
FhirResource.OBSERVATION -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("effectiveDateTime"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("effectiveDateTime"), codeColumn = Some("code.coding.code"))),
FhirResource.CONDITION -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("recordedDate"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("recordedDate"), codeColumn = Some("code.coding.code"))),
FhirResource.MEDICATION_REQUEST -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"),
Some("encounter"),
Some("dispenseRequest.validityPeriod.start"))),
Some("dispenseRequest.validityPeriod.start"), codeColumn = Some("medicationCodeableConcept.coding.code"))),
FhirResource.MEDICATION_ADMINISTRATION -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("effectivePeriod.start"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("effectivePeriod.start"), codeColumn = Some("medicationCodeableConcept.coding.code"))),
FhirResource.DOCUMENT_REFERENCE -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("date"))),
FhirResource.CLAIM -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("created"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("created"), codeColumn = Some("diagnosis.diagnosisCodeableConcept.coding.code"))),
FhirResource.PROCEDURE -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("date"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("date"), codeColumn = Some("code.coding.code"))),
FhirResource.IMAGING_STUDY -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("started"))),
FhirResource.QUESTIONNAIRE_RESPONSE -> addJoinedResourceColumns(
Expand Down Expand Up @@ -120,7 +120,9 @@ class RestFhirQueryElementsConfig extends ResourceConfig {

private def defaultResourceMapping(patientColumn: Option[String] = Some("patient"),
encounterColumn: Option[String] = Some("encounter"),
eventColumn: Option[String] = None): List[ResourceMapping] = {
eventColumn: Option[String] = None,
codeColumn: Option[String] = None
): List[ResourceMapping] = {
var resourceMappingList = List(
ResourceMapping(QueryColumnMapping(QueryColumn.ID, "id", classOf[IdType]))
)
Expand All @@ -136,6 +138,10 @@ class RestFhirQueryElementsConfig extends ResourceConfig {
resourceMappingList = resourceMappingList :+ ResourceMapping(
QueryColumnMapping(QueryColumn.EVENT_DATE, eventColumn.get, classOf[DateTimeType]))
}
if (codeColumn.isDefined) {
resourceMappingList = resourceMappingList :+ ResourceMapping(
QueryColumnMapping(QueryColumn.CODE, codeColumn.get, classOf[CodeableConcept]))
}
resourceMappingList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ import fr.aphp.id.eds.requester.{FhirResource, QueryColumn}
class SolrQueryElementsConfig extends ResourceConfig {

def buildMap(patientCol: List[String],
dateColListTarget: List[String]): Map[String, List[String]] = {
dateColListTarget: List[String], codeCol: Option[List[String]] = None): Map[String, List[String]] = {
Map(
QueryColumn.PATIENT -> patientCol,
QueryColumn.EVENT_DATE -> dateColListTarget,
QueryColumn.ENCOUNTER -> List(SolrColumn.ENCOUNTER),
QueryColumn.ENCOUNTER_START_DATE -> List(SolrColumn.ENCOUNTER_START_DATE),
QueryColumn.ENCOUNTER_END_DATE -> List(SolrColumn.ENCOUNTER_END_DATE),
QueryColumn.ID -> List(SolrColumn.ID),
QueryColumn.ORGANIZATIONS -> List(SolrColumn.ORGANIZATIONS)
)
QueryColumn.ORGANIZATIONS -> List(SolrColumn.ORGANIZATIONS),
) ++ codeCol.map(QueryColumn.CODE -> _).toMap
}

def buildMap(dateColListTarget: List[String]): Map[String, List[String]] = {
buildMap(List(SolrColumn.PATIENT), dateColListTarget)
def buildDefaultMap(dateColListTarget: List[String], codeCol: Option[List[String]] = None): Map[String, List[String]] = {
buildMap(List(SolrColumn.PATIENT), dateColListTarget, codeCol)
}

override def requestKeyPerCollectionMap: Map[String, Map[String, List[String]]] = Map(
Expand All @@ -32,26 +32,38 @@ class SolrQueryElementsConfig extends ResourceConfig {
QueryColumn.ID -> List(SolrColumn.ID),
QueryColumn.ORGANIZATIONS -> List(SolrColumn.ORGANIZATIONS)
),
FhirResource.MEDICATION_REQUEST -> buildMap(
List(SolrColumn.MedicationRequest.PERIOD_START, SolrColumn.MedicationRequest.PERIOD_END)),
FhirResource.MEDICATION_ADMINISTRATION -> buildMap(
List(SolrColumn.MedicationAdministration.PERIOD_START)),
FhirResource.OBSERVATION -> buildMap(List(SolrColumn.Observation.EFFECTIVE_DATETIME)),
FhirResource.CONDITION -> buildMap(List(SolrColumn.Condition.RECORDED_DATE)),
FhirResource.MEDICATION_REQUEST -> buildDefaultMap(
List(SolrColumn.MedicationRequest.PERIOD_START, SolrColumn.MedicationRequest.PERIOD_END),
codeCol = Some(List(SolrColumn.MedicationRequest.CODE_ATC, SolrColumn.MedicationRequest.CODE_UCD))
),
FhirResource.MEDICATION_ADMINISTRATION -> buildDefaultMap(
List(SolrColumn.MedicationAdministration.PERIOD_START),
codeCol = Some(List(SolrColumn.MedicationAdministration.CODE_ATC, SolrColumn.MedicationAdministration.CODE_UCD))
),
FhirResource.OBSERVATION -> buildDefaultMap(List(SolrColumn.Observation.EFFECTIVE_DATETIME),
codeCol = Some(List(SolrColumn.Observation.CODE))
),
FhirResource.CONDITION -> buildDefaultMap(List(SolrColumn.Condition.RECORDED_DATE),
codeCol = Some(List(SolrColumn.Condition.CODE))
),
FhirResource.PATIENT -> Map(QueryColumn.PATIENT -> List(SolrColumn.PATIENT),
QueryColumn.ID -> List(SolrColumn.ID),
QueryColumn.ORGANIZATIONS -> List(SolrColumn.ORGANIZATIONS)),
FhirResource.DOCUMENT_REFERENCE -> buildMap(List(SolrColumn.Document.DATE)),
FhirResource.COMPOSITION -> buildMap(List(SolrColumn.Document.DATE)),
FhirResource.DOCUMENT_REFERENCE -> buildDefaultMap(List(SolrColumn.Document.DATE)),
FhirResource.COMPOSITION -> buildDefaultMap(List(SolrColumn.Document.DATE)),
FhirResource.GROUP -> Map(QueryColumn.PATIENT -> List(SolrColumn.Group.RESOURCE_ID),
QueryColumn.ID -> List(SolrColumn.ID)),
FhirResource.CLAIM -> buildMap(List(SolrColumn.Claim.CREATED)),
FhirResource.PROCEDURE -> buildMap(List(SolrColumn.Procedure.DATE)),
FhirResource.CLAIM -> buildDefaultMap(List(SolrColumn.Claim.CREATED),
codeCol = Some(List(SolrColumn.Claim.CODE))
),
FhirResource.PROCEDURE -> buildDefaultMap(List(SolrColumn.Procedure.DATE),
codeCol = Some(List(SolrColumn.Procedure.CODE))
),
FhirResource.IMAGING_STUDY -> (buildMap(List(SolrColumn.PATIENT),
List(SolrColumn.ImagingStudy.STARTED,
SolrColumn.ImagingStudy.SERIES_STARTED)) ++ Map(
QueryColumn.GROUP_BY -> List(SolrColumn.ImagingStudy.STUDY_ID))),
FhirResource.QUESTIONNAIRE_RESPONSE -> (buildMap(
FhirResource.QUESTIONNAIRE_RESPONSE -> (buildDefaultMap(
List(SolrColumn.QuestionnaireResponse.AUTHORED)) ++ Map(
QueryColumn.EPISODE_OF_CARE -> List(SolrColumn.EPISODE_OF_CARE))),
FhirResource.UNKNOWN -> Map()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import fr.aphp.id.eds.requester.query.resolver.{ResourceConfig, ResourceResolver
import fr.aphp.id.eds.requester.tools.SolrTools
import org.apache.log4j.Logger
import org.apache.solr.client.solrj.SolrQuery
import org.apache.spark.sql.functions.{array, array_join, col, explode}
import org.apache.spark.sql.{DataFrame, SparkSession}

/** Class for questioning solr. */
Expand Down Expand Up @@ -42,6 +43,16 @@ class SolrQueryResolver(solrSparkReader: SolrSparkReader,
}
val convFunc = (columnName: String) =>
qbConfigs.reverseColumnMapping(resource.resourceType, columnName)

if (resource.uniqueFields.isDefined) {
val codeColumns = qbConfigs.requestKeyPerCollectionMap(resource.resourceType).getOrElse(QueryColumn.CODE, List())
if (codeColumns.nonEmpty) {
criterionDataFrame = criterionDataFrame.withColumn(
QueryColumn.CODE,
array_join(array(codeColumns.map((c) => col(s"`${c}`")): _*), ",")
)
}
}
criterionDataFrame.toDF(criterionDataFrame.columns.map(c => convFunc(c)).toSeq: _*)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,35 @@ package object solr {

object MedicationAdministration {
final val PERIOD_START = "effectivePeriod.start"
final val CODE_ATC = "_sort.atc"
final val CODE_UCD = "_sort.ucd"
}

object MedicationRequest {
final val PERIOD_START = "dispenseRequest.validityPeriod.start"
final val PERIOD_END = "dispenseRequest.validityPeriod.end"
final val CODE_ATC = "_sort.atc"
final val CODE_UCD = "_sort.ucd"
}

object Observation {
final val EFFECTIVE_DATETIME = "effectiveDateTime"
final val CODE = "code.coding.display.anabio"
}

object Claim {
final val CREATED = "created"
final val CODE = "diagnosis.diagnosisCodeableConcept.coding.display"
}

object Condition {
final val RECORDED_DATE = "recordedDate"
final val CODE = "code.coding.display"
}

object Procedure {
final val DATE = "performedDateTime"
final val CODE = "code.coding.display"
}

object Document {
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/fr/aphp/id/eds/requester/requester.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ package object requester {
final val ENCOUNTER_START_DATE = "encounter_start_date"
final val ENCOUNTER_END_DATE = "encounter_end_date"
final val PATIENT_BIRTHDATE = "patient_birthdate"
final val CODE = "code"
final val EVENT_DATE = "event_date"
final val LOCAL_DATE = "localDate"
final val AGE = "age"
Expand Down
3 changes: 3 additions & 0 deletions src/test/resources/testCases/nAmongMUniqueFields/expected.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
subject_id
7
1
Loading