From aafefcc6ab391afc883a8e5d736816cacdc096d4 Mon Sep 17 00:00:00 2001 From: YueWang Date: Tue, 7 Apr 2026 17:12:48 -0700 Subject: [PATCH 1/3] Add three tables --- nmdc_tables/table_1.py | 168 +++++++++++++++++++++++++++++++++++++++++ nmdc_tables/table_2.py | 102 +++++++++++++++++++++++++ nmdc_tables/table_3.py | 139 ++++++++++++++++++++++++++++++++++ 3 files changed, 409 insertions(+) create mode 100644 nmdc_tables/table_1.py create mode 100644 nmdc_tables/table_2.py create mode 100644 nmdc_tables/table_3.py diff --git a/nmdc_tables/table_1.py b/nmdc_tables/table_1.py new file mode 100644 index 00000000..a36d5f66 --- /dev/null +++ b/nmdc_tables/table_1.py @@ -0,0 +1,168 @@ +import requests +import logging + +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType +from pyspark.sql.functions import first + + +spark = SparkSession.builder.appName("NMDC Study Pipeline").getOrCreate() + +BASE_URL = "https://api.microbiomedata.org" + +## 4 studies for testing +STUDIES = [ + "nmdc:sty-11-34xj1150", + "nmdc:sty-11-hht5sb92", + "nmdc:sty-11-nxrz9m96", + "nmdc:sty-11-pzmd0x14", +] + +## capital words +ROLE_MAP = { + "Principal Investigator": "principal_investigator", + "principal_investigator": "principal_investigator", + "Methodology": "methodology", + "Data curation": "data_curation", +} + +logging.basicConfig(level=logging.INFO) + + +## helper functions to normalize role, person_id and email +def normalize_role(role: str) -> str: + if not role: + return None + role = role.strip() + return ROLE_MAP.get(role, role.lower().replace(" ", "_")) + + +def normalize_person_id(person: dict) -> str: + if not person: + return None + + if person.get("orcid"): + return f"orcid:{person['orcid']}" + elif person.get("name"): + return f"name:{person['name']}" + return None + + +def normalize_email(person: dict) -> str: + if not person: + return None + email = person.get("email") + if email: + return email.strip().lower() + return None + + +# Fetch API data for a single study +def fetch_study(study_id): + url = f"{BASE_URL}/studies/{study_id}" + try: + res = requests.get(url, timeout=10) + res.raise_for_status() + return res.json() + except Exception as e: + logging.warning(f"Error fetching {study_id}: {e}") + return None + + +# Extract study-person +def extract_study_person(data): + if not data: + return [] + + entity_id = data.get("id") + rows = [] + + # PI + pi = data.get("principal_investigator", {}) + if pi: + pid = normalize_person_id(pi) + email = normalize_email(pi) + + if pid: + rows.append( + ( + entity_id, + pid, + pi.get("name"), + email, + "principal_investigator", + ) + ) + + # Contributors + for assoc in data.get("has_credit_associations", []): + person = assoc.get("applies_to_person", {}) + roles = assoc.get("applied_roles", []) + + pid = normalize_person_id(person) + email = normalize_email(person) + + if not pid: + continue + + for role in roles: + role_clean = normalize_role(role) + + rows.append( + ( + entity_id, + pid, + person.get("name"), + email, + role_clean, + ) + ) + + return rows + + +# Schema +study_person_schema = StructType( + [ + StructField("study_id", StringType(), True), + StructField("person_id", StringType(), True), + StructField("name", StringType(), True), + StructField("email", StringType(), True), + StructField("role", StringType(), True), + ] +) + + +def main(): + logging.info("starting NMDC Study Pipeline") + + study_rdd = spark.sparkContext.parallelize(STUDIES) + + def process_study(study_id): + logging.info(f"Processing {study_id}") + data = fetch_study(study_id) + return extract_study_person(data) + + # flatMap to flatten rows + rows_rdd = study_rdd.flatMap(process_study) + + df = spark.createDataFrame(rows_rdd, schema=study_person_schema) + + study_person_spark = df.groupBy("study_id", "person_id", "role").agg( + first("name", ignorenulls=True).alias("name"), + first("email", ignorenulls=True).alias("email"), + ) + + ## Debug + study_person_spark.show(truncate=False) + study_person_spark.printSchema() + + ## output to parquet + output_path = "output/study_person" + study_person_spark.write.mode("overwrite").parquet(output_path) + + logging.info(f"Output saved to {output_path}") + + +if __name__ == "__main__": + main() diff --git a/nmdc_tables/table_2.py b/nmdc_tables/table_2.py new file mode 100644 index 00000000..25fd2d0d --- /dev/null +++ b/nmdc_tables/table_2.py @@ -0,0 +1,102 @@ +import time +import requests +import logging +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType + + +BASE_URL = "https://api.microbiomedata.org" +OUTPUT_PATH = "output/study_sample" + +STUDIES = [ + "nmdc:sty-11-34xj1150", + "nmdc:sty-11-hht5sb92", + "nmdc:sty-11-nxrz9m96", + "nmdc:sty-11-pzmd0x14", +] + + +logging.basicConfig(level=logging.INFO) + +study_sample_schema = StructType( + [ + StructField("study_id", StringType(), True), + StructField("biosample_id", StringType(), True), + ] +) + + +def normalize_biosample_id(bid): + if not bid: + return None + return bid if bid.startswith("nmdc:") else f"nmdc:{bid}" + + +def fetch_biosample_ids(study_id, retries=3): + url = f"{BASE_URL}/data_objects/study/{study_id}" + + for attempt in range(retries): + try: + res = requests.get(url, timeout=(5, 30)) + res.raise_for_status() + + try: + data = res.json() + except Exception: + logging.warning(f"{study_id}: JSON decode failed") + return [] + + biosamples = set() + + for record in data: + try: + metadata = record.get("metadata") or {} + if not isinstance(metadata, dict): + metadata = {} + + bid = record.get("biosample_id") or metadata.get("biosample_id") + + if bid: + biosamples.add(bid) + + except Exception: + continue + + return list(biosamples) + + except Exception as e: + logging.warning(f"{study_id} attempt {attempt + 1} failed: {e}") + time.sleep(2**attempt) + + return [] + + +def main(): + spark = SparkSession.builder.appName("NMDC_Table2_Extraction").getOrCreate() + study_rdd = spark.sparkContext.parallelize(STUDIES, numSlices=2) + + def process_study(study_id): + try: + biosample_ids = fetch_biosample_ids(study_id) + return [(study_id, bid) for bid in biosample_ids] + except Exception as e: + logging.error(f"Fatal error in {study_id}: {e}") + return [] + + rows_rdd = study_rdd.flatMap(process_study) + + df = spark.createDataFrame(rows_rdd, schema=study_sample_schema) + df_final = df.dropDuplicates() + + total = df_final.count() + logging.info(f"Total entries: {total}") + + df_final.show(10, truncate=False) + + df_final.write.mode("overwrite").parquet(OUTPUT_PATH) + + logging.info(f"Saved data to {OUTPUT_PATH}") + + +if __name__ == "__main__": + main() diff --git a/nmdc_tables/table_3.py b/nmdc_tables/table_3.py new file mode 100644 index 00000000..7a186d8d --- /dev/null +++ b/nmdc_tables/table_3.py @@ -0,0 +1,139 @@ +import requests +import logging +import time + +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType, LongType + + +BASE_URL = "https://api.microbiomedata.org" +OUTPUT_PATH = "output/sample_data" + +STUDIES = [ + "nmdc:sty-11-34xj1150", + "nmdc:sty-11-hht5sb92", + "nmdc:sty-11-nxrz9m96", + "nmdc:sty-11-pzmd0x14", +] + +logging.basicConfig(level=logging.INFO) + + +sample_data_schema = StructType( + [ + StructField("biosample_id", StringType(), True), + StructField("data_object_id", StringType(), True), + StructField("name", StringType(), True), + StructField("type", StringType(), True), + StructField("data_category", StringType(), True), + StructField("data_object_type", StringType(), True), + StructField("file_size_bytes", LongType(), True), + StructField("md5_checksum", StringType(), True), + StructField("url", StringType(), True), + StructField("was_generated_by", StringType(), True), + ] +) + + +def normalize_biosample_id(bid): + if not bid: + return None + return bid if bid.startswith("nmdc:") else f"nmdc:{bid}" + + +def fetch_raw_data(study_id, retries=3): + url = f"{BASE_URL}/data_objects/study/{study_id}" + + for attempt in range(retries): + try: + res = requests.get(url, timeout=(5, 30)) + res.raise_for_status() + + try: + data = res.json() + except Exception: + logging.warning(f"{study_id}: JSON decode failed") + return [] + + return data + + except Exception as e: + logging.warning(f"{study_id} attempt {attempt + 1} failed: {e}") + time.sleep(2**attempt) + + return [] + + +def extract_records(study_id): + try: + raw_data = fetch_raw_data(study_id) + + results = [] + + for record in raw_data: + try: + metadata = record.get("metadata") or {} + if not isinstance(metadata, dict): + metadata = {} + + bid = record.get("biosample_id") or metadata.get("biosample_id") + bid = normalize_biosample_id(bid) + + if not bid: + continue + + d_objects = record.get("data_objects") or [] + + for dobj in d_objects: + try: + results.append( + ( + bid, + dobj.get("id"), + dobj.get("name"), + dobj.get("type"), + dobj.get("data_category"), + dobj.get("data_object_type"), + dobj.get("file_size_bytes"), + dobj.get("md5_checksum"), + dobj.get("url"), + dobj.get("was_generated_by"), + ) + ) + except Exception: + continue + + except Exception: + continue + + return results + + except Exception as e: + logging.error(f"Fatal error in {study_id}: {e}") + return [] + + +def main(): + spark = SparkSession.builder.appName("NMDC_Table3_Extraction").getOrCreate() + logging.info(f"Starting Table 3 Extraction for {len(STUDIES)} studies") + + study_rdd = spark.sparkContext.parallelize(STUDIES, numSlices=2) + rows_rdd = study_rdd.flatMap(lambda sid: extract_records(sid)) + + df = spark.createDataFrame(rows_rdd, schema=sample_data_schema) + df_final = df.dropDuplicates() + + logging.info("Calculating total entries:") + total = df_final.count() + logging.info(f"Total entries: {total}") + + # Debug + df_final.show(10, truncate=True) + + # Output + df_final.write.mode("overwrite").parquet(OUTPUT_PATH) + logging.info(f"Saved Table 3 to {OUTPUT_PATH}") + + +if __name__ == "__main__": + main() From b910cc822e37d10678b801f65b7f608072e471f9 Mon Sep 17 00:00:00 2001 From: YueWang Date: Wed, 8 Apr 2026 10:18:09 -0700 Subject: [PATCH 2/3] modify the table 3 --- nmdc_tables/table_3.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/nmdc_tables/table_3.py b/nmdc_tables/table_3.py index 7a186d8d..44075d01 100644 --- a/nmdc_tables/table_3.py +++ b/nmdc_tables/table_3.py @@ -114,14 +114,25 @@ def extract_records(study_id): def main(): - spark = SparkSession.builder.appName("NMDC_Table3_Extraction").getOrCreate() + spark = SparkSession.builder.appName("NMDC_Table3_Extraction").config("spark.driver.memory", "4g").getOrCreate() + logging.info(f"Starting Table 3 Extraction for {len(STUDIES)} studies") study_rdd = spark.sparkContext.parallelize(STUDIES, numSlices=2) - rows_rdd = study_rdd.flatMap(lambda sid: extract_records(sid)) + + def process_study(sid): + try: + logging.debug(f"Processing {sid}") + return extract_records(sid) + except Exception as e: + logging.error(f"Error processing {sid}: {e}") + return [] + + rows_rdd = study_rdd.flatMap(process_study) df = spark.createDataFrame(rows_rdd, schema=sample_data_schema) - df_final = df.dropDuplicates() + + df_final = df.dropDuplicates().cache() logging.info("Calculating total entries:") total = df_final.count() @@ -130,7 +141,6 @@ def main(): # Debug df_final.show(10, truncate=True) - # Output df_final.write.mode("overwrite").parquet(OUTPUT_PATH) logging.info(f"Saved Table 3 to {OUTPUT_PATH}") From d47a2b232691d0f2da97f334f4330a8d9c79b92c Mon Sep 17 00:00:00 2001 From: YueWang Date: Wed, 8 Apr 2026 12:33:53 -0700 Subject: [PATCH 3/3] change format --- nmdc_tables/table_1.py | 54 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/nmdc_tables/table_1.py b/nmdc_tables/table_1.py index a36d5f66..07df2b21 100644 --- a/nmdc_tables/table_1.py +++ b/nmdc_tables/table_1.py @@ -1,12 +1,23 @@ -import requests +"""NMDC Table 1 pipeline. + +Extracts study-person relationships from NMDC API +and writes a normalized parquet table. +""" + import logging +import requests from pyspark.sql import SparkSession -from pyspark.sql.types import StructType, StructField, StringType from pyspark.sql.functions import first +from pyspark.sql.types import StructField, StructType, StringType + +# spark = SparkSession.builder.appName("NMDC Study Pipeline").getOrCreate() + + +def get_spark(): + return SparkSession.builder.appName("NMDC Study Pipeline").getOrCreate() -spark = SparkSession.builder.appName("NMDC Study Pipeline").getOrCreate() BASE_URL = "https://api.microbiomedata.org" @@ -31,6 +42,12 @@ ## helper functions to normalize role, person_id and email def normalize_role(role: str) -> str: + """ + Normalize role string to a standardized format. + + Converts role names to lowercase with underscores and maps + known roles using ROLE_MAP. + """ if not role: return None role = role.strip() @@ -38,28 +55,53 @@ def normalize_role(role: str) -> str: def normalize_person_id(person: dict) -> str: + """ + Generate a normalized person identifier. + + Uses ORCID if available, otherwise falls back to name. + """ if not person: return None if person.get("orcid"): return f"orcid:{person['orcid']}" - elif person.get("name"): + + if person.get("name"): return f"name:{person['name']}" + return None def normalize_email(person: dict) -> str: + """ + Normalize email address from person object. + + Returns a lowercase, trimmed email string if present, + otherwise returns None. + """ if not person: return None + email = person.get("email") if email: return email.strip().lower() + return None # Fetch API data for a single study -def fetch_study(study_id): +def fetch_study(study_id: str) -> dict | None: + """ + Fetch study data from NMDC API. + + Args: + study_id: NMDC study identifier. + + Returns: + Parsed JSON response as a dictionary, or None if the request fails. + """ url = f"{BASE_URL}/studies/{study_id}" + try: res = requests.get(url, timeout=10) res.raise_for_status() @@ -136,6 +178,8 @@ def extract_study_person(data): def main(): logging.info("starting NMDC Study Pipeline") + spark = get_spark() + study_rdd = spark.sparkContext.parallelize(STUDIES) def process_study(study_id):