Convert the PICORI CDM dataset stored as Parquet files at ~/datasets/stroke_data into a fully standards-compliant OMOP CDM v5.4.2 instance compatible with OHDSI tools and analyses (Achilles, DQD, PLP, CohortMethod, ATLAS).
- OHDSI organization repositories (tools, packages, guidance): OHDSI GitHub
- OMOP Common Data Model v5.4 (DDL, constraints, documentation): CommonDataModel
- Standard queries and canonical algorithms (e.g., era building, utilities): OMOP-Queries
- OMOP–PCORnet alignment background (construct mapping guidance): OMOP–PCORnet slides
Target CDM version: v5.4.2. Do not alter the OMOP DDLs; any changes must be data-level only.
- Location:
~/datasets/stroke_data(Parquet files). Assumed to be PICORI/PCORnet CDM-like tables (e.g., DEMOGRAPHIC, ENCOUNTER, DIAGNOSIS, PROCEDURES, VITAL, LAB_RESULT_CM, PRESCRIBING, DISPENSING, IMMUNIZATION, OBS_CLIN, OBS_GEN, PRO_CM, ENROLLMENT, DEATH, PROVIDER/FACILITY/SITE, COST/CHARGE where available).
- PostgreSQL database with OMOP v5.4.2 CDM in schema
cdm, optionalstagingfor intermediate loads, andresultsfor Achilles/DQD outputs. - Fully loaded OMOP vocabularies from Athena snapshot (recorded version/date).
- Populated OMOP tables across all applicable domains with enforced constraints.
- Generated eras (
condition_era,drug_era,dose_era) via canonical OHDSI queries. - Validation reports: DQD results, Achilles results, reconciliation and row-count checks.
- CDM v6 migration; NLP/notes extraction; federated network configuration.
- Compute/ETL: PySpark (Spark 3.3–3.5), Python 3.10+, Java 8/11.
- Database: PostgreSQL 14/15.
- R toolchain (for validation): R 4.2–4.4 with Achilles and DataQualityDashboard.
- File system: Local Parquet inputs, local staging CSVs for reference mappings when needed.
- Provision PostgreSQL and create
cdm,staging,resultsschemas. - Load OMOP v5.4.2 DDLs exactly as provided by OHDSI.
- Load Athena vocabularies into
cdm. - Profile source Parquet tables and confirm PCORnet version.
- Execute PySpark ETL by domain: produce dimension and fact tables into
staging(optional) and then intocdm. - Build eras using
OMOP-QueriesSQL. - Run DQD and Achilles, fix issues, iterate until clean.
PICORI2OMOP/
plan.md # This document
README.md
LICENSE
etl/
config/
etl_config.yml # Paths, DB, vocabulary version, run params
secrets.example.yml # Example secret values (no secrets committed)
spark/
common/
io_utils.py
mapping_utils.py
units.py
ids.py
validation.py
load_person.py
load_visits.py
load_condition.py
load_procedure.py
load_drug.py
load_measurement.py
load_observation.py
load_death.py
load_cost.py
build_eras.py
mappings/
encounter_type.csv # PCORnet enc_type -> OMOP visit concepts
dx_type.csv # diagnosis provenance -> type_concept_id
proc_type.csv # procedure provenance -> type_concept_id
drug_type.csv # prescription vs dispensing -> type_concept_id
units.csv # source unit -> UCUM concept + multiplier
value_sets.csv # categorical result/value -> standard concepts
site_care_site.csv # site/facility ids -> care_site
provider_map.csv # provider ids mapping (if needed)
sql/
create_schemas.sql
cdm_ddl_postgresql.sql # From OHDSI/CommonDataModel (not edited)
vocab_load.sql # COPY/\copy commands for vocabularies
eras/
condition_era.sql
drug_era.sql
dose_era.sql
checks/
pk_fk_checks.sql
not_null_checks.sql
rowcounts.sql
scripts/
bootstrap.sh # One-shot setup: DB, schemas, vocab load
run_etl.sh # Orchestrates domain loads
run_validation.sh # DQD + Achilles
docs/
mapping_spec.xlsx # Field-level mapping spec
decisions_log.md
data_dictionary.md
- Linux host with sufficient memory (≥16 GB recommended for Spark local).
- Java 11, Python 3.10+, Spark 3.4/3.5 (standalone local).
- PostgreSQL 15 and
psqlclient, or Docker. - R 4.3+ with
remotesto install OHDSI packages.
docker run -d --name omop-pg -e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=omop -p 5432:5432 postgres:15File: etl/sql/create_schemas.sql
CREATE SCHEMA IF NOT EXISTS cdm;
CREATE SCHEMA IF NOT EXISTS staging;
CREATE SCHEMA IF NOT EXISTS results;Apply:
psql postgresql://postgres:postgres@localhost:5432/omop -f etl/sql/create_schemas.sqlDownload from CommonDataModel (PostgreSQL folder for v5.4). Place as etl/sql/cdm_ddl_postgresql.sql.
psql postgresql://postgres:postgres@localhost:5432/omop -v schema=cdm -f etl/sql/cdm_ddl_postgresql.sqlNote: Do not modify official DDLs. Use -v schema=cdm only if the script supports schema parameterization; otherwise set search_path to cdm.
- Request/download Athena vocabulary bundle (include SNOMED, RxNorm, LOINC, CPT4, HCPCS, ICD9/10, UCUM, etc.).
- Unzip to a local folder, e.g.,
~/athena/. - Load tables:
vocabulary,concept,concept_synonym,relationship,concept_relationship,concept_ancestor,drug_strength.
Example psql session (adapt paths accordingly):
\copy cdm.vocabulary FROM '~/athena/VOCABULARY.csv' WITH (FORMAT csv, HEADER true, DELIMITER E'\t', QUOTE E'\b');
\copy cdm.concept FROM '~/athena/CONCEPT.csv' WITH (FORMAT csv, HEADER true, DELIMITER E'\t', QUOTE E'\b');
\copy cdm.concept_synonym FROM '~/athena/CONCEPT_SYNONYM.csv' WITH (FORMAT csv, HEADER true, DELIMITER E'\t', QUOTE E'\b');
\copy cdm.relationship FROM '~/athena/RELATIONSHIP.csv' WITH (FORMAT csv, HEADER true, DELIMITER E'\t', QUOTE E'\b');
\copy cdm.concept_relationship FROM '~/athena/CONCEPT_RELATIONSHIP.csv' WITH (FORMAT csv, HEADER true, DELIMITER E'\t', QUOTE E'\b');
\copy cdm.concept_ancestor FROM '~/athena/CONCEPT_ANCESTOR.csv' WITH (FORMAT csv, HEADER true, DELIMITER E'\t', QUOTE E'\b');
\copy cdm.drug_strength FROM '~/athena/DRUG_STRENGTH.csv' WITH (FORMAT csv, HEADER true, DELIMITER E'\t', QUOTE E'\b');Record and fix the exact Athena version/date in etl/config/etl_config.yml.
File: etl/config/etl_config.yml
source:
parquet_root: "/home/asadr/datasets/stroke_data"
target:
jdbc_url: "jdbc:postgresql://localhost:5432/omop"
db_user: "postgres"
db_password_env: "OMOP_DB_PASSWORD" # read from environment
cdm_schema: "cdm"
staging_schema: "staging"
results_schema: "results"
vocabulary:
snapshot_date: "2025-09-30"
enforce_standard_only: true
etl:
spark_master: "local[*]"
partitions: 8
batch_size_rows: 50000
write_mode: "append" # staging
timezone: "UTC"
ids:
strategy: "mapping_table" # mapping_table | sequence | hash
salt_env: "OMOP_ID_SALT" # used if strategy == hash
logging:
level: "INFO"
path: "etl/logs"
validation:
run_dqd: true
run_achilles: true-
Concepts
- Populate
*_concept_idwith Standard concepts only (concept.standard_concept = 'S'). - Keep original codes/strings in
*_source_valueand, where mappable,*_source_concept_id(often Non-Standard). - If no mapping: set
*_concept_id = 0, still populate*_source_value.
- Populate
-
Domain routing
- Route records to OMOP tables based on the Standard concept’s
domain_id(e.g., Measurement vs Observation).
- Route records to OMOP tables based on the Standard concept’s
-
Visit modeling
- Build
visit_occurrencefrom PCORnetENCOUNTER. - Map
enc_typeto OMOP visit concepts (e.g., Inpatient, Outpatient, Emergency Room). Maintain externalmappings/encounter_type.csvand query vocabulary to resolve final concept_ids. - Use
visit_detailfor unit-level details (bed/ward) if available.
- Build
-
Dates/times
- Populate
*_datewhen only dates are present; fill*_datetimeonly if source provides time. Do not fabricate time components.
- Populate
-
Units and numeric values
- Normalize to UCUM; record original unit in
unit_source_valueand target inunit_concept_id. - Apply deterministic multipliers for unit conversions; document in
mappings/units.csv.
- Normalize to UCUM; record original unit in
-
Type concepts
- Populate
*_type_concept_idaccording to provenance (EHR, billing, problem list, lab test, prescription written vs dispensing). Maintain external CSV mappings per source provenance to Type Concepts.
- Populate
-
Keys
- Use stable surrogate keys via mapping tables (preferred) or deterministic hashing with salt.
- Maintain persistent crosswalks:
PATID→person_id,ENCOUNTERID→visit_occurrence_id,PROVIDERID→provider_id.
-
Observation periods
- Prefer
ENROLLMENTforobservation_periodstart/end per person; otherwise derive from min/max clinical facts.
- Prefer
-
Provenance and traceability
- Populate
*_type_concept_idand*_source_valueeverywhere; keepprovider_id,visit_occurrence_idlinks when available.
- Populate
-
De-duplication
- Deduplicate exact duplicates post-mapping within person–date–code granularity; retain earliest load timestamp.
- Constraints
- Respect OMOP DDL constraints (not-null, FK). Do not modify schema. Log and quarantine violating rows with reproducible filters.
- Era building
- Use OHDSI canonical SQL from OMOP-Queries for
condition_era,drug_era,dose_era.
Maintain a detailed docs/mapping_spec.xlsx with column-level mappings, transformations, and concept mapping rules. Summary guidance:
PATID→person.person_id(via mapping table or hash). Store original inperson_source_valueif desired (v5.4 usesperson_source_value?)- Sex →
gender_concept_id(Standard); unknown → OMOP Unknown or 0. - Race →
race_concept_id(Standard) andrace_source_value. - Hispanic →
ethnicity_concept_id(Standard) andethnicity_source_value. - Birth date →
year_of_birth,month_of_birth,day_of_birth. - Address →
location(if present); facility/site →care_site.
ENC_TYPE→visit_concept_idvia mapping; keepadmitting_source_concept_id,discharge_to_concept_idif mappable.- Start/end datetime;
provider_id,care_site_idif present.
- ICD-9/10-CM → SNOMED via
Maps to;condition_concept_idStandard; keep source code incondition_source_value/condition_source_concept_id. DX_TYPE/PDX→condition_status_concept_id(principal, secondary) andcondition_type_concept_id(provenance).- Date linked to visit; set
visit_occurrence_id.
- CPT4/HCPCS/ICD-PCS → Standard (often SNOMED or ICD-10-PCS Standard where applicable).
procedure_type_concept_idfrom provenance; modifiers if available.
- NDC → RxNorm Standard via
Maps toor viaconcept_relationshipusingRxNorm/RxNorm Extension. - Distinguish written vs dispensed using
drug_type_concept_id. - Quantity, days supply, route, dose; normalize units; link to visit.
- Lab test code → LOINC Standard; numeric results →
value_as_number; categorical →value_as_concept_idvia value mapping. - Units → UCUM; reference ranges; abnormal flags.
- Map each vital (BP systolic/diastolic, HR, temp, height, weight, BMI) to LOINC/Standard; convert units.
- Route by resulting Standard concept domain; categorical values →
value_as_concept_id.
- Map vaccine codes to RxNorm; use
drug_type_concept_idfor immunization context.
- Date of death; causes via
condition_occurrenceorobservationas available.
- Start/end by
PATID; enrollment type topayer_plan_periodif present.
- Map identifiers and attributes; maintain crosswalks for stable IDs.
- Attach costs to clinical facts; store amounts/currency consistently.
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("PICORI2OMOP")
.master("local[*]")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate())demographic_df = spark.read.parquet("/home/asadr/datasets/stroke_data/DEMOGRAPHIC.parquet")
encounter_df = spark.read.parquet("/home/asadr/datasets/stroke_data/ENCOUNTER.parquet")
# ... others ...- Use broadcast joins for
conceptandconcept_relationship(or cache dimension-sized subsets by vocabulary). - For a code in vocabulary V (e.g., ICD10CM), first find the source
concept_idbyconcept_code+vocabulary_id, then join viaconcept_relationship(relationship_id = 'Maps to') to a Standard target whereinvalid_reason IS NULL. - If multiple targets: generate multiple rows (preferred) OR define deterministic selection per domain (documented in
docs/decisions_log.md).
Example SQL for diagnosis mapping sanity:
SELECT s.concept_id AS source_concept_id,
s.concept_code AS source_code,
r.relationship_id,
t.concept_id AS standard_concept_id,
t.domain_id,
t.standard_concept
FROM cdm.concept s
JOIN cdm.concept_relationship r ON r.concept_id_1 = s.concept_id AND r.relationship_id = 'Maps to'
JOIN cdm.concept t ON t.concept_id = r.concept_id_2
WHERE s.vocabulary_id = 'ICD10CM'
AND s.concept_code = 'I63.9'
AND r.invalid_reason IS NULL
AND t.invalid_reason IS NULL;Maintain etl/mappings/units.csv:
source_unit,ucum_unit,unit_concept_id,multiplier,notes
mg/dL,mg/dL,8840,1.0,
g/dL,g/dL,8713,1.0,
mmol/L,mmol/L,8753,1.0,
"10^9/L","10^9/L",9448,1.0,
kg,kg,9529,1.0,
lb,kg,9529,0.45359237,"convert lb to kg"
Apply conversion before writing numeric value_as_number and set unit_concept_id accordingly.
File: etl/mappings/encounter_type.csv
enc_type,omop_visit_concept_id,notes
AV,9202,Outpatient Visit (example; confirm from vocabulary)
ED,9203,Emergency Room Visit
EI,262,ER and Inpatient Visit
IP,9201,Inpatient Visit
IS,9201,Inpatient Stay
OA,9202,Other Ambulatory
OS,9202,Observation/Outpatient
HH,581476,Home health (example; validate)
NI,0,No information
UN,0,Unknown
Note: The concept IDs above are examples; resolve via queries against loaded vocabularies to avoid hard-coding.
Use vocabulary-driven selection where vocabulary_id = 'Type Concept'.
File: etl/mappings/dx_type.csv
source_dx_type,condition_type_concept_id,condition_status_concept_id,notes
AD,38000230,0,Admitting diagnosis (example; validate)
PD,38000201,32902,Principal diagnosis (example; validate)
SD,38000245,0,Secondary diagnosis (example; validate)
Preferred: persistent mapping tables in the database, populated and reused across runs.
staging.id_map_person(source_id TEXT PRIMARY KEY, person_id BIGINT UNIQUE)staging.id_map_visit(source_id TEXT PRIMARY KEY, visit_occurrence_id BIGINT UNIQUE)staging.id_map_provider(...),staging.id_map_care_site(...), etc.
Populate with monotonic sequences from PostgreSQL or allocate ID ranges to Spark tasks:
CREATE SEQUENCE IF NOT EXISTS cdm.person_id_seq START 100000 INCREMENT 1;For batch inserts, acquire nextval blocks per partition, or generate in DB via INSERT ... RETURNING when writing smaller dimension tables.
Alternative: deterministic hashing
person_id = abs(hash_siphash24(salt || PATID)) % 9000000000 + 1000000000
Maintain staging.id_map_* to preserve stability over time.
- Vocabulary tables
- Reference dimensions:
care_site,provider,location person,observation_periodvisit_occurrence,visit_detail- Clinical facts:
condition_occurrence,procedure_occurrence,drug_exposure,measurement,observation,device_exposure,specimen,death costand other auxiliaries- Eras
def write_df(df, table, mode="append"):
(df.write
.format("jdbc")
.option("url", cfg["target"]["jdbc_url"])
.option("dbtable", f"{cfg['target']['cdm_schema']}.{table}")
.option("user", cfg["target"]["db_user"])
.option("password", os.environ[cfg["target"]["db_password_env"]])
.option("driver", "org.postgresql.Driver")
.mode(mode)
.save())- If source provides
last_modified/update_datecolumns, implement watermark-based extraction; track high-watermark per table; upsert via staging and MERGE (or delete/insert per natural key + date).
- All rejects (constraint violations, unmapped codes, invalid dates) go to
staging.rejects_<table>with reason/category. - Summarize rejects per batch in logs and
docs/decisions_log.md.
- Primary key uniqueness (no duplicates).
- Not-null fields populated per CDM DDLs.
- Foreign key integrity (no orphans).
- Temporal logic (start_date ≤ end_date; within observation period where applicable).
- Compare source counts vs OMOP per domain; acceptable deltas documented (e.g., DIAGNOSIS → condition_occurrence within ±1% after de-duplication and domain routing).
Run DQD pointing to cdm and results schemas.
R example:
install.packages("remotes")
remotes::install_github("OHDSI/DataQualityDashboard")
library(DataQualityDashboard)
cd <- list(
dbms = "postgresql",
server = "localhost/omop",
user = "postgres",
password = Sys.getenv("OMOP_DB_PASSWORD"),
port = 5432,
schema = "cdm",
writeSchema = "results"
)
DataQualityDashboard::executeDqChecks(connectionDetails = DatabaseConnector::createConnectionDetails(
dbms = cd$dbms, server = cd$server, user = cd$user, password = cd$password, port = cd$port
), cdmDatabaseSchema = cd$schema, resultsDatabaseSchema = cd$writeSchema, numThreads = 4)remotes::install_github("OHDSI/Achilles")
library(Achilles)
cd <- list(
dbms = "postgresql",
server = "localhost/omop",
user = "postgres",
password = Sys.getenv("OMOP_DB_PASSWORD"),
port = 5432,
cdmSchema = "cdm",
resultsSchema = "results"
)
Achilles(
cdmDatabaseSchema = cd$cdmSchema,
resultsDatabaseSchema = cd$resultsSchema,
connectionDetails = DatabaseConnector::createConnectionDetails(
dbms = cd$dbms, server = cd$server, user = cd$user, password = cd$password, port = cd$port
),
numThreads = 4,
createTable = TRUE
)- 0 primary key duplicate violations; 0 FK violations; 0 not-null violations.
- DQD passes ≥95% checks (configurable), with all critical failures triaged or resolved.
- Achilles completes without fatal errors; key summary stats reasonable.
- Reconciliation: domain-level counts within documented deltas; spot-check samples map to correct Standard concepts and domains.
- Provenance:
*_type_concept_idand*_source_valuepopulated across domains.
- Never log PHI or raw identifiers; mask or hash in logs.
- Secrets via environment variables (
OMOP_DB_PASSWORD,OMOP_ID_SALT). - Record exact versions of vocabularies, DDLs, and ETL code; store in
docs/decisions_log.md.
export OMOP_DB_PASSWORD=postgres
export OMOP_ID_SALT="change-me-strong-salt"
# 1) Create schemas
psql postgresql://postgres:$OMOP_DB_PASSWORD@localhost:5432/omop -f etl/sql/create_schemas.sql
# 2) Load CDM DDLs (download from CommonDataModel)
psql postgresql://postgres:$OMOP_DB_PASSWORD@localhost:5432/omop -f etl/sql/cdm_ddl_postgresql.sql
# 3) Load vocabularies (adjust paths)
psql postgresql://postgres:$OMOP_DB_PASSWORD@localhost:5432/omop -f etl/sql/vocab_load.sqlpython -m etl.spark.load_person --config etl/config/etl_config.yml
python -m etl.spark.load_visits --config etl/config/etl_config.yml
python -m etl.spark.load_condition --config etl/config/etl_config.yml
python -m etl.spark.load_procedure --config etl/config/etl_config.yml
python -m etl.spark.load_drug --config etl/config/etl_config.yml
python -m etl.spark.load_measurement --config etl/config/etl_config.yml
python -m etl.spark.load_observation --config etl/config/etl_config.yml
python -m etl.spark.load_death --config etl/config/etl_config.yml
python -m etl.spark.load_cost --config etl/config/etl_config.yml
python -m etl.spark.build_eras --config etl/config/etl_config.ymlRscript -e "remotes::install_github('OHDSI/DataQualityDashboard'); DataQualityDashboard::executeDqChecks(...)"
Rscript -e "remotes::install_github('OHDSI/Achilles'); Achilles::Achilles(...)"- Re-run domain loads with
write_mode=overwriteonstagingonly; do not overwritecdmwithout backup. - Use
sql/checks/*.sqlto quickly detect PK/FK/not-null issues.
- Bootstrap environment; load CDM DDLs and vocabularies.
- Source profiling: schemas, value distributions, code systems.
- Build
docs/mapping_spec.xlsxandetl/mappings/*for encounters, type concepts, units, value sets. - Implement IDs: mapping tables and sequences.
- Implement
person,observation_period,visit_occurrence(+detail if applicable). - Implement domain ETLs: conditions, procedures, drugs, measurements, observations, death, cost.
- Generate eras via OHDSI SQL.
- Run DQD and Achilles; iteratively fix failures.
- Final reconciliation and acceptance sign-off.
- Targeting OMOP v5.4.2 ensures best compatibility with OHDSI tooling (some tools not fully v6-ready). See CommonDataModel.
- Use OMOP-Queries for era derivations and canonical logic.
- Refer to OHDSI for packages like Achilles, DQD, HADES suite, and Usagi for mapping assistance.
- For PCORnet semantics vs OMOP domains, consult the OMOP–PCORnet slides and validate local decisions in
docs/decisions_log.md.
SELECT concept_id, concept_name
FROM cdm.concept
WHERE vocabulary_id = 'Visit' AND standard_concept = 'S';-- Use mapping CSV to map enc_type codes to concept names, then resolve IDs by joining to cdm.concept
-- This prevents stale hard-coded IDs.SELECT concept_id, concept_name
FROM cdm.concept
WHERE vocabulary_id = 'Type Concept' AND standard_concept IS NULL;SELECT COUNT(*) FROM cdm.condition_occurrence WHERE condition_concept_id = 0; -- unmapped
SELECT domain_id, COUNT(*) FROM cdm.concept WHERE concept_id IN (
SELECT DISTINCT condition_concept_id FROM cdm.condition_occurrence
) GROUP BY domain_id; -- should be ConditionThis plan is designed to be executable by an engineering agent with Spark + PostgreSQL access and to produce an OMOP v5.4.2 instance that passes OHDSI-standard validations. Adhere strictly to the official DDLs and vocabulary-driven mappings to maintain compatibility with the OHDSI ecosystem.