diff --git a/README.md b/README.md index 37ad6dff..df3ab339 100644 --- a/README.md +++ b/README.md @@ -1,55 +1,70 @@ -# [toFHIR](https://onfhir.io/tofhir/index.html) -toFHIR is an easy-to-use data mapping and high-performant data transformation tool to transform existing datasets from -various types of sources to HL7 FHIR. It can be used as a library or standalone tool for data integration and data -transformation into HL7 FHIR. The standalone mode accepts command line arguments to either run a batch execution right -away or to start a command line interface (CLI) to accept certain commands. +# Ignifyr (formerly toFHIR) -toFHIR can read from various data sources such as a file system, relational database, streaming inputs like Apache Kafka or a FHIR Server. -toFHIR's mapping language utilizes [onfhir-template-engine](https://github.com/srdc/fhir-template-engine)'s template language and allows 1-to-1, 1-to-many, -many-to-1 and many-to-many mappings. By executing the mapping definitions, toFHIR generates HL7 FHIR resources and they can -be either persisted to a file system or to a running HL7 FHIR endpoint. +[![Research by SRDC](https://img.shields.io/badge/Research-SRDC-red)](https://srdc.com.tr) +[![Commercial Support](https://img.shields.io/badge/Commercial%20Support-Pontegra-blue)](https://pontegra.com) + +> [!IMPORTANT] +> **Rebranding Announcement** +> +> **toFHIR** has been officially rebranded as **Ignifyr**. This change reflects our transition from a research-focused engine at [**SRDC**](https://srdc.com.tr) to a commercially supported product line by [**Pontegra**](https://pontegra.com). +> +> **Note on Technical Migration:** While the project identity has changed, the internal codebase - including package names (`io.tofhir`), configuration keys, and Docker image tags - still uses the legacy `tofhir` naming. We are planning a phased migration for these technical components in upcoming major releases. + +--- + +## Overview +**Ignifyr** is a powerful, FHIR-first ETL engine designed to map legacy health data to the **HL7 FHIR** standard. It is built to handle complex mappings with high performance and scalability. + +It can be used as a library or a standalone tool for data integration. The standalone mode accepts command-line arguments to run batch executions or start a command-line interface (CLI) for interactive mapping management. + +* **Website:** [ignifyr.io](https://ignifyr.io) +* **Research Paper:** _To be published soon..._ + +### Key Capabilities + +* **Versatile Connectivity:** Read from file systems, RDBMS, Apache Kafka, REDCap, or FHIR servers. +* **Advanced Mapping:** Utilizes the [onfhir-template-engine](https://github.com/srdc/fhir-template-engine) to support 1-to-1, 1-to-many, many-to-1, and many-to-many mappings. +* **Flexible Output:** Generate HL7 FHIR resources and persist them to a file system or directly to a FHIR endpoint (e.g., [Repofyr](https://repofyr.io)). ## Modules -toFHIR consists of the following modules: -- `tofhir-engine`: The core module of toFHIR which includes the main functionality of the tool. -- `tofhir-server`: A standalone web server module that provides a REST API to run mapping jobs via `tohir-engine` and manage the mapping job definitions. -- `tofhir-server-common`: Holds common files like server configuration or errors for server implementations. -- `tofhir-common`: Contains model and utility classes shared across various modules. -- `tofhir-rxnorm`: Provides a client implementation to access the RxNorm API and a FHIR Path Function library to utilize its API functionalities in mapping definitions. +Ignifyr consists of the following modules: +- `tofhir-engine`: The core engine including the main mapping and transformation functionality. +- `tofhir-server`: A standalone web server providing a REST API to manage and run mapping jobs. +- `tofhir-server-common`: Shared files and configurations for server implementations. +- `tofhir-common`: Shared model and utility classes. +- `tofhir-rxnorm`: Client for RxNorm API access and FHIRPath functions for terminology integration. For a visual representation of the dependencies between these modules, please refer to the diagram below: ![module-component-diagram.png](readme-assets%2Fmodule-component-diagram.png) ## Requirements -toFHIR requires the following to run: +To run Ignifyr, you need: * Java 11.0.2 * Scala 2.13 -* An HL7 FHIR repository if you would like to persist the created resources (e.g., [onFHIR](https://github.com/srdc/onfhir)) +* An HL7 FHIR repository if you would like to persist the created resources (e.g., [Repofyr](https://repofyr.io)) -## Supported Data Source Types -toFHIR can read data from the following data source types: +## Supported Data Sources +Ignifyr can read data from the following data source types: * File System (Excel, CSV, TSV, JSON, Parquet) -* RDMS (PostgreSQL) +* RDBMS (PostgreSQL) * Apache Kafka * REDCap -* FHIR Server (OnFHIR, Firely Server etc.) +* FHIR Server (Repofyr, HAPI FHIR Server, Firely Server etc.) ## Usage -toFHIR can be used through its standalone `tofhir-engine` or via the web server `tofhir-server`. +Ignifyr can be utilized via the standalone Engine (CLI/Batch) or the Web Server (REST API). -## toFHIR Engine +### 1. Ignifyr Engine (CLI & Batch) -If the engine will be used as a standalone tool, when it is started, the engine waits for the commands from the command line interface (CLI). -Also, arguments can be provided to the executable to start the engine with a specific command or configuration file. -Possible arguments to the executable are as follows: +When started as a standalone tool, the engine can run in two modes based on arguments: -- `cli`: Starts the CLI. This is the default command if no arguments are provided. -- `run`: Runs the configured mapping-job as a batch job and shuts down after finishing. `run` command accepts the following parameters: +- `cli`: Starts the interactive Command Line Interface (Default). +- `run`: Runs a configured mapping-job as a batch process and shuts down. `run` command accepts the following parameters: - `--job`: The path to the mapping-job to be executed. If provided, overrides the path provided to the JVM as the configuration parameter. - `--mappings`: The path to the mappings folder. If provided, overrides the path provided to the JVM as the configuration parameter. - `--schemas`: The path to the schemas folder. If provided, overrides the path provided to the JVM as the configuration parameter. @@ -59,24 +74,26 @@ Possible arguments to the executable are as follows: - `--definition-root-url`: The root url of FHIR resources - `--encoding`: The encoding of CSV file whose default value is UTF-8 (OPTIONAL) -### CLI +#### CLI Commands -toFHIR serves via CLI with certain commands: +Ignifyr serves via CLI with certain commands: - `help`: Displays the help text and see the available commands and their use. - `info`: See info about the loaded Mapping Job. -- `load`: Loads a Mapping Job Load the Mapping Job definition file from the path. +- `load `: Loads a Mapping Job Load the Mapping Job definition file from the path. - `run [|]`: Run the task(s). Without a parameter, all task of the loaded Mapping Job are run. A specific task can be indicated with its name or URL. -- `extract-redcap-schemas [path] [definition-root-url] [encoding]`: Extracts schemas from the given REDCap data dictionary file. Schemas will be annotated with the given definition root url. If the encoding of CSV file is different from UTF-8, you should provide it. -- `stop`: Stop the execution of the MappingJob (if any). -- `exit|quit`: Exit the program. After the app is up and running, these commands are ready to be executed. If there is no mapping job loaded initially, firstly, a mapping job needs to be loaded with the command `load `. This command loads the mapping job located in the path. After that, the mapping job can be run with the command `run`. +### 2. Ignifyr Server (REST API) +The server provides a REST API to manage the lifecycle of mapping projects. +* **Base URL:** http://:8085/tofhir (default) +* **API Documentation:** [SwaggerHub API Docs](https://app.swaggerhub.com/apis-docs/toFHIR/toFHIR-Server/) + ### Configurations -Here is an example of a configuration file: +Ignifyr uses HOCON-based configuration. Below is a snippet of the standard tofhir.conf structure: ```conf tofhir { @@ -166,7 +183,7 @@ akka = { } ``` -Considering the configuration file defined above, toFHIR can be utilized with a folder structure like the following: +Considering the configuration file defined above, Ignifyr can be utilized with a folder structure like the following: ```html tofhir-definitions (root folder of definitions) @@ -198,100 +215,14 @@ tofhir-definitions (root folder of definitions) └── tofhir.conf ``` -Of course, you are free to organize the definitions in any way you like and arrange the configuration file accordingly. +You are free to organize the definitions in any way you like and arrange the configuration file accordingly. However, we are suggesting to keep the definitions in a folder structure as shown above to keep the definitions organized and easy to manage. -## toFHIR Server - -If the web server is used, it will start the web server and the engine will be available via the REST API. -Considering the same folder structure example given above, the server will also use the same configuration settings. -Additionally, there are extra configurations to be made in the configuration file: - -```conf -fhir = { - # major FHIR version, currently R4 and R5 is supported - fhir-version = "R4" - # List of root URLs while retrieving the definitions (profiles, valuesets, codesystems). - # The definitions below the given root URLs will be retrieved from the configured paths or FHIR endpoints. - # All definitions will be retrieved if no root URLs are provided. - # e.g. ["https://aiccelerate.eu", "https://fair4health.eu"] - definitions-root-urls = ["http://hl7.org/fhir/"] - - # FHIR URL to retrieve resource definitions (profiles, valuesets and codesystems). - # If this URL is defined, file paths (profiles-path, valuesets-path, codesystems-path) will be ignored even if they are also provided. - # For now, toFHIR can read definitions from a single FHIR endpoint. - definitions-fhir-endpoint = "http://localhost:8080/fhir" - fhir-endpoint-auth = { - # basic | token | fixed-token - # If one of the auth methods is selected, its configurations must be provided as shown below. - method = null - -# # basic configurations are used if the auth method is basic -# basic = { -# username = "user" -# password = "pass" -# } -# -# # token configurations are used if the auth method is token -# token = { -# client-id = "id" -# client-secret = "secret" -# scopes = [] -# token-endpoint = "https://onauth.srdc.com.tr" -# } - -# # fixed token configurations are used if the auth method is fixed-token -# fixed-token = "XXX" - } - - # Path to the zip file or folder that includes the FHIR resource and data type profile definitions (FHIR StructureDefinition) to be served by toFHIR webserver so that mappings can be performed accordingly. - profiles-path = null - - # Path to the zip file or folder that includes the FHIR Value Set definitions (FHIR ValueSet) that are referenced by your FHIR profiles. - valuesets-path = null - - # Path to the zip file or folder that includes the FHIR Code system definitions (FHIR CodeSystem) that are referenced by your FHIR value sets. - codesystems-path = null -} - -webserver = { - # Hostname that toFHIR server will work. Using 0.0.0.0 will bind the server to both localhost and the IP of the server that you deploy it. - host = 0.0.0.0 - - # Port to listen - port = 8085 - - # Base Uri for server e.g. With this default configuration, the root path of toFHIR server will be http://localhost:8085/tofhir - base-uri = tofhir - - ssl { - # Path to the java keystore for enabling ssl for toFHIR server, use null to disable ssl - keystore = null - # Password of the keystore for enabling ssl for toFHIR server - password = null - } -} -``` - -After the server is up and running, the engine will be available via the REST API. - -With the REST APIs, you are able to do the following operations: -* Create/edit projects and subsequently create/edit schemas, mappings, mapping contexts, mapping jobs -* Create/edit terminology systems -* Test the mappings -* Run mapping jobs -* Query and see the logs of the mapping job execution results - -API documentations for `tofhir-server` can be found at the following URLs: - -https://app.swaggerhub.com/apis-docs/toFHIR/toFHIR-Server - -## Definitions Used in toFHIR +## Definitions Used in Ignifyr ### Project -A project is a container for schemas, mappings, mapping contexts, mapping jobs. -It is a concept used in toFHIR to organize the definitions and to group them together. +A container for schemas, mappings, mapping contexts, and mapping jobs. It organizes definitions into logical groups. Please note that, terminology systems are not included in a project. They are defined separately and can be used in any project. @@ -354,7 +285,7 @@ An example of a simple mapping definition file: The json snippet above illustrates the structure of an example mapping. On the top, the `url`, `name`, and `title` fields are the metadata of the mapping. The `source` field is used to define the source schema of the mapping. The `mapping` field is the list of mapping definitions. The real magic in mappings happens in the `expression` fields (e.g. {{``}} ). -toFHIR uses the expression to generate the FHIR resources by using [onfhir-template-engine](https://github.com/srdc/fhir-template-engine). +Ignifyr uses the expression to generate the FHIR resources by using [onfhir-template-engine](https://github.com/srdc/fhir-template-engine). By doing so, it can generate the FHIR resources based on the source data. For example, considering `{{gender}}` expression, it refers to "gender" column in the source data. @@ -516,7 +447,7 @@ and returns a HL7 FHIR Quantity object: ###### 1. Batch Mode -If not set explicitly, toFHIR uses the batch mode by default. In the batch mode, toFHIR goes through these steps: +If not set explicitly, Ignifyr uses the batch mode by default. In the batch mode, Ignifyr goes through these steps: 1. Reads the source data 2. Executes the mappings 3. Persists the generated FHIR resources to the sink @@ -596,12 +527,12 @@ This field is a relative path to the `dataFolderPath` defined in the source sett ###### 2. Streaming Mode -toFHIR supports streaming of file system in case you want to continuously monitor the changes on the source data and stream the -newcoming/updated data to toFHIR mapping executions. This can be done with the `asStream` config parameter of the source. -If it is set to `true`, toFHIR will monitor the FileSystemSource files defined at `path` paths and trigger the mapping -executions in case the files are updated. toFHIR automatically marks the processed data source files and only processes the newcoming/updated records. +Ignifyr supports streaming of file system in case you want to continuously monitor the changes on the source data and stream the +newcoming/updated data to Ignifyr mapping executions. This can be done with the `asStream` config parameter of the source. +If it is set to `true`, Ignifyr will monitor the FileSystemSource files defined at `path` paths and trigger the mapping +executions in case the files are updated. Ignifyr automatically marks the processed data source files and only processes the newcoming/updated records. -toFHIR goes through these steps in the streaming mode: +Ignifyr goes through these steps in the streaming mode: 1. Reads the initial existing source data 2. Executes the mappings 3. Persists the generated FHIR resources to the sink @@ -653,7 +584,7 @@ Example of a Mapping Job definition file with csv source type in streaming mode: The json snippet above illustrates the structure of an example mapping job in streaming mode. Similar to the batch mode, most of the fields are the same. The only differences are: - `asStream` field in the source settings -- `path` in the source binding of the mapping. `path` should be the name of the **folder** this time, and it is where toFHIR will monitor the changes. +- `path` in the source binding of the mapping. `path` should be the name of the **folder** this time, and it is where Ignifyr will monitor the changes. ##### SQL @@ -728,8 +659,8 @@ Mapping job and mapping examples shown below for the streaming type of sources l } } ``` -toFHIR only considers the value field of kafka topics. Therefore, when you subscribe a topic, -toFHIR waits for string-type data but in correct JSON format. +Ignifyr only considers the value field of kafka topics. Therefore, when you subscribe a topic, +Ignifyr waits for string-type data but in correct JSON format. For example, when you want to use the data in the topic, you should publish the data in the following format: ```json { @@ -739,7 +670,7 @@ For example, when you want to use the data in the topic, you should publish the } ``` ##### RedCAP -toFHIR integrates seamlessly with RedCAP through the [tofhir-redcap integration module](https://github.com/srdc/tofhir-redcap). +Ignifyr integrates seamlessly with RedCAP through the [tofhir-redcap integration module](https://github.com/srdc/tofhir-redcap). Utilize the same configuration approach as described for Kafka, with a few key considerations: - **Source Configuration**: In the mapping job's **sourceSettings**, specify that the data originates from RedCAP by setting the `asRedCap` field to `true`. Here's an example JSON configuration: @@ -799,7 +730,7 @@ Within the mapping source, you can define the resource type (e.g., Patient, Obse ``` #### Custom Options -Since toFHIR uses Apache Spark in its core, you can give any option that is supported by Apache Spark. +Since Ignifyr uses Apache Spark in its core, you can give any option that is supported by Apache Spark. Available options for different source types can be found in the following links: - File System - CSV & TSV: https://spark.apache.org/docs/3.4.1/sql-data-sources-csv.html#data-source-option @@ -995,7 +926,7 @@ Please refer to the following files for full definitions: #### Sink Settings -toFHIR supports persisting the generated FHIR resources to a FHIR repository. The sink settings are defined in the mapping job definition file. +Ignifyr supports persisting the generated FHIR resources to a FHIR repository. The sink settings are defined in the mapping job definition file. The following example shows the sink settings for a FHIR repository: ```json @@ -1020,7 +951,7 @@ Or you can use a local file system to persist the generated FHIR resources: ``` #### Terminology Service -[A FHIR terminology service](https://hl7.org/fhir/terminology-service.html) can be automatically used by toFHIR to handle +[A FHIR terminology service](https://hl7.org/fhir/terminology-service.html) can be automatically used by Ignifyr to handle concept lookup and concept map operations. If a terminology service is configured, mapping definitions can use lookup and translation services for codes/values of codesystems/valuesets. @@ -1040,7 +971,7 @@ An available FHIR terminology service can be configured as in the following: ... ``` -toFHIR provides a `LocalFhirTerminologyService` which allows to use text files for concept details and translations. You +Ignifyr provides a `LocalFhirTerminologyService` which allows to use text files for concept details and translations. You can provide the concept map files or code/codesystem details by configuring the terminology service as in the following example: @@ -1066,7 +997,7 @@ example: } ... ``` -toFHIR's FHIRPath engine provides two functions becoming available when a terminology service is configured: +Ignifyr's FHIRPath engine provides two functions becoming available when a terminology service is configured: - `trms:lookupDisplay`: Lookup the display name of a given code and code system - `trms:translateToCoding`: Translate the give code+codesystem within a valueset to the target code+codesystem (formatted as [Coding](https://hl7.org/fhir/datatypes.html#Coding)) within target valueset. @@ -1091,7 +1022,7 @@ you can do something like this. This creates a FHIR-Coding object automatically ``` #### Identity Service -toFHIR allows you to use a FHIR endpoint as and identity service in case FHIR resource identifiers need to be fetched given +Ignifyr allows you to use a FHIR endpoint as and identity service in case FHIR resource identifiers need to be fetched given the business identifiers. In this case, you can use the `idxs:resolveIdentifier` function with the following parameters: `idxs:resolveIdentifier(FHIR resource type, Identifier.value, Identifier.system)` which returns a FHIR reference such as `Patient/455435464698`. @@ -1106,9 +1037,9 @@ The following example puts the FHIR resource id of the Patient into the referenc #### Scheduled Jobs -toFHIR supports running scheduled jobs with defined time ranges. +Ignifyr supports running scheduled jobs with defined time ranges. To do so, you need to specify a cron expression in the mapping job definitions. -toFHIR uses [cron4j](https://www.sauronsoftware.it/projects/cron4j/) library to handle scheduled jobs. +Ignifyr uses [cron4j](https://www.sauronsoftware.it/projects/cron4j/) library to handle scheduled jobs. Scheduled patterns for the expression can be found in the documentation section of cron4j. Synchronization times for scheduled jobs are maintained in a folder defined `db-path` setting in the configuration file. @@ -1173,7 +1104,7 @@ after you run the mapping job, lets say at 2022-08-08T10:05:30, the following va #### Archiving -toFHIR supports archiving of erroneous records and the source data files. +Ignifyr supports archiving of erroneous records and the source data files. If you want to archive only the erroneous records, which are the records that could not be processed/mapped by the mapping job, you can specify the config in the mapping job definitions. The erroneous records are saved in the `erroneous-records-folder` defined in the sub-config of the `archiving` config in the configuration file. @@ -1233,4 +1164,121 @@ Or both. This will delete the source files after processing/mapping and save the While `archiveMode` works on a file-based basis, `saveErroneousRecords` works for each record/row in the source data. -Please also note that, the `archiveMode` config is only applicable for the file system source type. \ No newline at end of file +Please also note that, the `archiveMode` config is only applicable for the file system source type. + +#### Batching Strategy + +When dealing with large data sources (e.g., 10 million+ rows), loading all data into memory at once is not practical. +Ignifyr supports a **batching strategy** that allows you to process data in smaller batches by defining parameter sets that filter the source data. + +Each batch is processed sequentially: data is loaded, mapped, written to the sink, and then memory is freed before moving to the next batch. + +##### Basic Usage + +Define a `batchingStrategy` in your `FhirMappingTask` along with a `preprocessSql` that uses parameter placeholders (prefixed with `$`): + +```json +{ + "name": "measurement-mapping", + "mappingRef": "https://example.com/measurement-mapping", + "sourceBinding": { + "source": { + "jsonClass": "SqlSource", + "query": "SELECT * FROM MEASUREMENT", + "preprocessSql": "SELECT * FROM MEASUREMENT WHERE EXTRACT(YEAR FROM MEASUREMENT_DATE) = $year" + } + }, + "batchingStrategy": { + "batchParameterSets": [ + {"year": "2018"}, + {"year": "2019"}, + {"year": "2020"} + ] + } +} +``` + +In this example, Ignifyr will execute 3 batches, one for each year. The `$year` placeholder in `preprocessSql` is replaced with values from `batchParameterSets`. + +##### Multiple Parameters + +You can use multiple parameters for finer-grained batching: + +```json +{ + "name": "encounter-mapping", + "mappingRef": "https://example.com/encounter-mapping", + "sourceBinding": { + "encounterMain": { + "jsonClass": "FileSystemSource", + "path": "encounters.csv", + "contentType": "csv", + "preprocessSql": "SELECT * FROM encounterMain WHERE EXTRACT(YEAR FROM ENCOUNTER_DATE) = $year AND EXTRACT(MONTH FROM ENCOUNTER_DATE) = $month" + } + }, + "batchingStrategy": { + "batchParameterSets": [ + {"year": "2020", "month": "01"}, + {"year": "2020", "month": "02"}, + {"year": "2020", "month": "03"} + ] + } +} +``` + +ID Ranges with Custom Logic: + +```json +{ + "preprocessSql": "SELECT * FROM PATIENTS WHERE patient_id BETWEEN $id_start AND $id_end", + "batchingStrategy": { + "batchParameterSets": [ + {"id_start": "1", "id_end": "100000"}, + {"id_start": "100001", "id_end": "200000"}, + {"id_start": "200001", "id_end": "300000"}, + {"id_start": "300001", "id_end": "400000"} + ] + } +} +``` + +**Key points:** +- Parameter placeholders in `preprocessSql` use the `$parameterName` syntax +- All parameters in the placeholder must exist in each object of `batchParameterSets` +- Batches are processed sequentially, reducing memory footprint +- Only sources with `preprocessSql` containing parameter placeholders are affected by the batching strategy + +##### Generating Parameter Sets + +For large ranges (e.g., 40 years × 12 months), you can generate `batchParameterSets` programmatically using the helper scripts: + +```python +import json + +def generate_year_month_batches(start_year, end_year): + """Generate batch parameter sets for year-month combinations""" + batches = [] + for year in range(start_year, end_year + 1): + for month in range(1, 13): + batches.append({ + "year": str(year), + "month": str(month) + }) + return batches + +# Generate batches for 1980-2020 (40 years) +batches = generate_year_month_batches(1980, 2020) + +print(f"Total batches: {len(batches)}") # Output: 492 batches + +# Create the batching strategy +batching_strategy = { + "batchParameterSets": batches +} + +# Save to file +with open('batching_strategy.json', 'w') as f: + json.dump(batching_strategy, f, indent=2) + +print("Batching strategy saved to batching_strategy.json") +``` diff --git a/pom.xml b/pom.xml index 182cfc7a..bf0b0be0 100644 --- a/pom.xml +++ b/pom.xml @@ -114,7 +114,7 @@ 2.2.224 12.1.0.0 2.2.5 - 1.19.3 + 1.21.4 2.8.5 10.5.3 0.10.2 diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/job/FhirMappingJobManager.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/job/FhirMappingJobManager.scala index 74d1f6e5..7fffbdd1 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/job/FhirMappingJobManager.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/mapping/job/FhirMappingJobManager.scala @@ -289,6 +289,36 @@ class FhirMappingJobManager( } } + /** + * Substitutes batch parameters in the preprocessSql of all source bindings in a mapping task. + * Parameters in preprocessSql are specified as $parameterName and will be replaced with the corresponding value. + * + * @param task The original mapping task + * @param parameters Map of parameter names to their values (e.g., Map("year" -> "2014", "month" -> "1")) + * @return A new mapping task with substituted preprocessSql in all source bindings + */ + private def substituteBatchParameters(task: FhirMappingTask, parameters: Map[String, String]): FhirMappingTask = { + val updatedSourceBinding = task.sourceBinding.map { case (alias, binding) => + val updatedBinding = binding.preprocessSql match { + case Some(sql) => + val substitutedSql = parameters.foldLeft(sql) { case (currentSql, (paramName, paramValue)) => + currentSql.replace(s"$$$paramName", paramValue) + } + // Create a new binding with the substituted SQL based on the binding type + binding match { + case fs: FileSystemSource => fs.copy(preprocessSql = Some(substitutedSql)) + case ss: SqlSource => ss.copy(preprocessSql = Some(substitutedSql)) + case ks: KafkaSource => ks.copy(preprocessSql = Some(substitutedSql)) + case fss: FhirServerSource => fss.copy(preprocessSql = Some(substitutedSql)) + case other => other // For any other type, return as-is + } + case None => binding + } + alias -> updatedBinding + } + task.copy(sourceBinding = updatedSourceBinding) + } + /** * Read the source data, divide it into chunks and execute the mapping (first mapping task in the Fhir Mapping Job * Execution) and write each chunk sequentially @@ -308,7 +338,65 @@ class FhirMappingJobManager( identityServiceSettings: Option[IdentityServiceSettings] = None, timeRange: Option[(LocalDateTime, LocalDateTime)] = None): Future[Unit] = { val mappingTask = mappingJobExecution.mappingTasks.head - logger.debug(s"Reading source data for mapping ${mappingTask.name} within mapping job ${mappingJobExecution.jobId} ...") + + // Check if this task has a batching strategy defined + mappingTask.batchingStrategy match { + case Some(strategy) if strategy.batchParameterSets.nonEmpty => + // Execute the mapping task for each batch parameter set sequentially + val totalBatches = strategy.batchParameterSets.size + logger.debug(s"Batching strategy defined for mapping ${mappingTask.name} with $totalBatches batches") + + strategy.batchParameterSets.zipWithIndex.foldLeft(Future.successful(())) { case (previousFuture, (batchParams, batchIndex)) => + previousFuture.flatMap { _ => + val batchNumber = batchIndex + 1 + val isLastBatch = batchNumber == totalBatches + logger.debug(s"Processing batch $batchNumber/$totalBatches for mapping ${mappingTask.name} with parameters: $batchParams") + + // Substitute the batch parameters in the task's preprocessSql + val batchTask = substituteBatchParameters(mappingTask, batchParams) + + // Execute the batch, only log final result on last batch + executeSingleBatch(mappingJobExecution.copy(mappingTasks = Seq(batchTask)), sourceSettings, fhirWriter, + terminologyServiceSettings, identityServiceSettings, timeRange, Some(batchNumber), Some(totalBatches), Some(batchParams), isLastBatch) + } + } + + case _ => + // No batching strategy, execute normally (always the last/only batch) + executeSingleBatch(mappingJobExecution, sourceSettings, fhirWriter, terminologyServiceSettings, + identityServiceSettings, timeRange, None, None, None, isLastBatch = true) + } + } + + /** + * Execute a single batch of the mapping task. This is called either once (when no batching strategy) + * or multiple times (once for each batch parameter set). + * + * @param mappingJobExecution Fhir Mapping Job execution + * @param sourceSettings The source settings of the mapping job + * @param fhirWriter FHIR writer + * @param terminologyServiceSettings Terminology service settings + * @param identityServiceSettings Identity service settings + * @param timeRange Time range for the source data to load + * @param batchNumber Current batch number (1-based), None if no batching + * @param totalBatches Total number of batches, None if no batching + * @param batchParams Parameters for this batch, None if no batching + * @param isLastBatch Whether this is the last batch (used to determine when to log final execution result) + * @return + */ + private def executeSingleBatch(mappingJobExecution: FhirMappingJobExecution, + sourceSettings: Map[String, MappingJobSourceSettings], + fhirWriter: BaseFhirWriter, + terminologyServiceSettings: Option[TerminologyServiceSettings], + identityServiceSettings: Option[IdentityServiceSettings], + timeRange: Option[(LocalDateTime, LocalDateTime)], + batchNumber: Option[Int], + totalBatches: Option[Int], + batchParams: Option[Map[String, String]], + isLastBatch: Boolean): Future[Unit] = { + val mappingTask = mappingJobExecution.mappingTasks.head + val batchInfo = batchNumber.map(n => s" [batch $n/${totalBatches.getOrElse("?")}]").getOrElse("") + logger.debug(s"Reading source data for mapping ${mappingTask.name}$batchInfo within mapping job ${mappingJobExecution.jobId} ...") val (fhirMapping, mds, df) = readJoinSourceData(mappingTask, sourceSettings, timeRange, jobId = Some(mappingJobExecution.jobId)) // Cache the DataFrame to avoid re-reading the source data multiple times during processing. // This is particularly useful when using chunking (e.g., via ToFhirConfig.engineConfig.maxChunkSizeForMappingJobs), @@ -316,16 +404,17 @@ class FhirMappingJobManager( // and reused across all chunks, improving performance. df.cache() val sizeOfDf: Long = df.count() - logger.debug(s"$sizeOfDf records read for mapping ${mappingTask.name} within mapping job ${mappingJobExecution.jobId} ...") + val batchParamsInfo = batchParams.map(p => s" with params: $p").getOrElse("") + logger.debug(s"$sizeOfDf records read for mapping ${mappingTask.name}$batchInfo$batchParamsInfo within mapping job ${mappingJobExecution.jobId} ...") val result = ToFhirConfig.engineConfig.maxChunkSizeForMappingJobs match { //If not specify run it as single chunk case None => - logger.debug(s"Executing the mapping ${mappingTask.name} within job ${mappingJobExecution.jobId} ...") + logger.debug(s"Executing the mapping ${mappingTask.name}$batchInfo within job ${mappingJobExecution.jobId} ...") executeTask(mappingJobExecution.jobId, mappingTask.name, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), Some(mappingJobExecution.projectId)) .map(dataset => SinkHandler.writeMappingResult(spark, mappingJobExecution, mappingTask.name, dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter case Some(chunkSize) if sizeOfDf < chunkSize => - logger.debug(s"Executing the mapping ${mappingTask.name} within job ${mappingJobExecution.jobId} ...") + logger.debug(s"Executing the mapping ${mappingTask.name}$batchInfo within job ${mappingJobExecution.jobId} ...") executeTask(mappingJobExecution.jobId, mappingTask.name, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), Some(mappingJobExecution.projectId)) .map(dataset => SinkHandler.writeMappingResult(spark, mappingJobExecution, mappingTask.name, dataset, fhirWriter)) // Write the created FHIR Resources to the FhirWriter //Otherwise divide the data into chunks @@ -339,15 +428,19 @@ class FhirMappingJobManager( case (fj, (df, i)) => fj.flatMap(_ => executeTask(mappingJobExecution.jobId, mappingTask.name, fhirMapping, df, mds, terminologyServiceSettings, identityServiceSettings, Some(mappingJobExecution.id), projectId = Some(mappingJobExecution.projectId)) .map(dataset => SinkHandler.writeMappingResult(spark, mappingJobExecution, mappingTask.name, dataset, fhirWriter)) - .map(_ => logger.debug(s"Chunk ${i + 1} / $numOfChunks is completed for mapping ${mappingTask.name} within MappingJob: ${mappingJobExecution.jobId}...")) + .map(_ => logger.debug(s"Chunk ${i + 1} / $numOfChunks$batchInfo is completed for mapping ${mappingTask.name} within MappingJob: ${mappingJobExecution.jobId}...")) ) } } result.map(r => { // Remove the DataFrame from cache after processing to free up memory resources. df.unpersist() - // log the result of mapping task execution - ExecutionLogger.logExecutionResultForBatchMappingTask(mappingJobExecution.id) + // Only log the final result of mapping task execution when this is the last batch + // For batching strategy with multiple batches, this ensures the execution cache entry + // is only removed after all batches complete + if (isLastBatch) { + ExecutionLogger.logExecutionResultForBatchMappingTask(mappingJobExecution.id) + } r }) } diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/BatchingStrategy.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/BatchingStrategy.scala new file mode 100644 index 00000000..6d41803d --- /dev/null +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/BatchingStrategy.scala @@ -0,0 +1,28 @@ +package io.tofhir.engine.model + +/** + * Batching strategy for processing source data in multiple batches. + * This allows defining custom batching logic per mapping task, such as processing data year by year, + * ID range by ID range, or any custom grouping criteria. + * + * @param batchParameterSets Sequence of parameter sets, where each set represents one batch. + * Each parameter set is a map of (parameterName -> parameterValue). + * All parameters in a set will be substituted in preprocessSql as $parameterName. + * + * Example for single parameter (year): + * batchParameterSets = Seq( + * Map("year" -> "2014"), + * Map("year" -> "2015"), + * Map("year" -> "2016") + * ) + * + * Example for multiple parameters (year + month): + * batchParameterSets = Seq( + * Map("year" -> "2020", "month" -> "1"), + * Map("year" -> "2020", "month" -> "2"), + * Map("year" -> "2020", "month" -> "3"), + * Map("year" -> "2021", "month" -> "1"), + * ... + * ) + */ +case class BatchingStrategy(batchParameterSets: Seq[Map[String, String]]) diff --git a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala index 999c87a8..20939773 100644 --- a/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala +++ b/tofhir-engine/src/main/scala/io/tofhir/engine/model/FhirMappingTask.scala @@ -10,8 +10,11 @@ package io.tofhir.engine.model * @param sourceBinding A map that provides details on how to load each source data for the mapping. * It links the source settings of a mapping job to the sources of a mapping. * @param mapping FhirMapping definition to execute + * @param batchingStrategy Optional batching strategy to process data in multiple batches based on custom parameters + * (e.g., by year, ID range, or any custom grouping). If provided, the mapping will be executed + * once for each parameter value, with the parameter available in preprocessSql as $parameterName */ -case class FhirMappingTask(name: String, mappingRef: String, sourceBinding: Map[String, MappingSourceBinding], mapping: Option[FhirMapping] = None) +case class FhirMappingTask(name: String, mappingRef: String, sourceBinding: Map[String, MappingSourceBinding], mapping: Option[FhirMapping] = None, batchingStrategy: Option[BatchingStrategy] = None) /** diff --git a/tofhir-server/api.yaml b/tofhir-server/api.yaml index 51452b02..1f30e2d1 100644 --- a/tofhir-server/api.yaml +++ b/tofhir-server/api.yaml @@ -2667,6 +2667,29 @@ components: - $ref: "#/components/schemas/SqlSource" - $ref: "#/components/schemas/KafkaSource" - $ref: "#/components/schemas/FhirServerSource" + batchingStrategy: + $ref: "#/components/schemas/BatchingStrategy" + + BatchingStrategy: + description: "Batching strategy for processing source data in multiple batches based on custom parameters" + type: object + properties: + batchParameterSets: + type: array + description: "Sequence of parameter sets, where each set represents one batch. Each parameter set is a map of (parameterName -> parameterValue). All parameters in a set will be substituted in preprocessSql as $parameterName." + items: + type: object + additionalProperties: + type: string + example: + - year: "2020" + month: "1" + - year: "2020" + month: "2" + - year: "2021" + month: "1" + required: + - batchParameterSets SchedulingSettings: type: object diff --git a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala index 11fec3c7..1099f11e 100644 --- a/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala +++ b/tofhir-server/src/main/scala/io/tofhir/server/service/ExecutionService.scala @@ -201,7 +201,7 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin logger.debug(s"Testing the mapping ${testResourceCreationRequest.fhirMappingTask.mappingRef} inside the job $jobId by selecting ${testResourceCreationRequest.resourceFilter.numberOfRows} ${testResourceCreationRequest.resourceFilter.order} records.") // If an unmanaged mapping is provided within the mapping task, normalize the context urls - val mappingTask: FhirMappingTask = + val mappingTaskWithNormalizedUrls: FhirMappingTask = testResourceCreationRequest.fhirMappingTask.mapping match { case None => testResourceCreationRequest.fhirMappingTask case _ => @@ -213,6 +213,16 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin testResourceCreationRequest.fhirMappingTask.copy(mapping = Some(mappingWithNormalizedContextUrls)) } + // If the mapping task has a batching strategy, substitute the first batch parameter set + // This allows testing of parameterized preprocessSql queries with sample parameters + val mappingTask: FhirMappingTask = mappingTaskWithNormalizedUrls.batchingStrategy match { + case Some(strategy) if strategy.batchParameterSets.nonEmpty => + val firstBatchParams = strategy.batchParameterSets.head + logger.debug(s"Testing with first batch parameters: $firstBatchParams") + substituteBatchParameters(mappingTaskWithNormalizedUrls, firstBatchParams) + case _ => mappingTaskWithNormalizedUrls + } + val fhirMappingJobManager = new FhirMappingJobManager( toFhirEngine.mappingRepo, toFhirEngine.contextLoader, @@ -394,6 +404,36 @@ class ExecutionService(jobRepository: IJobRepository, mappingRepository: IMappin toFhirEngine.runningJobRegistry.isJobRunning(jobId) } } + + /** + * Substitutes batch parameters in the preprocessSql of all source bindings in a mapping task. + * Parameters in preprocessSql are specified as $parameterName and will be replaced with the corresponding value. + * + * @param task The original mapping task + * @param parameters Map of parameter names to their values (e.g., Map("year" -> "2014", "month" -> "1")) + * @return A new mapping task with substituted preprocessSql in all source bindings + */ + private def substituteBatchParameters(task: FhirMappingTask, parameters: Map[String, String]): FhirMappingTask = { + val updatedSourceBinding = task.sourceBinding.map { case (alias, binding) => + val updatedBinding = binding.preprocessSql match { + case Some(sql) => + val substitutedSql = parameters.foldLeft(sql) { case (currentSql, (paramName, paramValue)) => + currentSql.replace(s"$$$paramName", paramValue) + } + // Create a new binding with the substituted SQL based on the binding type + binding match { + case fs: FileSystemSource => fs.copy(preprocessSql = Some(substitutedSql)) + case ss: SqlSource => ss.copy(preprocessSql = Some(substitutedSql)) + case ks: KafkaSource => ks.copy(preprocessSql = Some(substitutedSql)) + case fss: FhirServerSource => fss.copy(preprocessSql = Some(substitutedSql)) + case other => other // For any other type, return as-is + } + case None => binding + } + alias -> updatedBinding + } + task.copy(sourceBinding = updatedSourceBinding) + } }