55from biofilter .utils .file_hash import compute_file_hash
66from biofilter .etl .mixins .entity_query_mixin import EntityQueryMixin
77from biofilter .db .models import (
8- EntityGroup ,
98 PathwayMaster ,
109) # noqa E501
1110from biofilter .etl .mixins .base_dtp import DTPBase
@@ -15,27 +14,29 @@ class DTP(DTPBase, EntityQueryMixin):
1514 def __init__ (
1615 self ,
1716 logger = None ,
17+ debug_mode = False ,
1818 datasource = None ,
19- etl_process = None ,
19+ package = None ,
2020 session = None ,
2121 use_conflict_csv = False ,
2222 ): # noqa: E501
2323 self .logger = logger
24+ self .debug_mode = debug_mode
2425 self .data_source = datasource
25- self .etl_process = etl_process
26+ self .package = package
2627 self .session = session
2728 self .use_conflict_csv = use_conflict_csv
2829
2930 # DTP versioning
3031 self .dtp_name = "dtp_reactome"
31- self .dtp_version = "1.0 .0"
32- self .compatible_schema_min = "3.0 .0"
32+ self .dtp_version = "1.1 .0"
33+ self .compatible_schema_min = "3.1 .0"
3334 self .compatible_schema_max = "4.0.0"
3435
3536 # ⬇️ -------------------------- ⬇️
3637 # ⬇️ ------ EXTRACT FASE ------ ⬇️
3738 # ⬇️ -------------------------- ⬇️
38- def extract (self , raw_dir : str , force_steps : bool ):
39+ def extract (self , raw_dir : str ):
3940 """
4041 Downloads Reactome data. Uses the hash of 'ReactomePathways.txt' as
4142 reference. Only proceeds with full extraction if the hash has changed.
@@ -59,12 +60,6 @@ def extract(self, raw_dir: str, force_steps: bool):
5960 "Ensembl2Reactome.txt" ,
6061 "UniProt2Reactome.txt" ,
6162 ]
62- if force_steps :
63- last_hash = ""
64- msg = "Ignoring hash check."
65- self .logger .log (msg , "WARNING" )
66- else :
67- last_hash = self .etl_process .raw_data_hash
6863
6964 try :
7065 # Landing directory
@@ -86,10 +81,6 @@ def extract(self, raw_dir: str, force_steps: bool):
8681
8782 # Step 2: Compute hash and compare
8883 current_hash = compute_file_hash (file_path )
89- if current_hash == last_hash :
90- msg = f"No change detected in { main_file } " # noqa: E501
91- self .logger .log (msg , "INFO" )
92- return False , msg , current_hash # Skip further downloads
9384
9485 # Step 3: Download the remaining files
9586 for file_name in files_to_download :
@@ -163,9 +154,11 @@ def transform(self, raw_dir: str, processed_dir: str):
163154 output_file_master = output_path / "master_data"
164155
165156 # Save filtered pathways
166- df_pathways .to_csv (
167- output_file_master .with_suffix (".csv" ), index = False
168- ) # noqa: E501
157+ if self .debug_mode :
158+ df_pathways .to_csv (
159+ output_file_master .with_suffix (".csv" ), index = False
160+ ) # noqa: E501
161+
169162 df_pathways .to_parquet (
170163 output_file_master .with_suffix (".parquet" ), index = False
171164 )
@@ -311,9 +304,11 @@ def transform(self, raw_dir: str, processed_dir: str):
311304 output_file_relationship = output_path / "relationship_data"
312305
313306 # Save relationship pathways
314- df_relations .to_csv (
315- output_file_relationship .with_suffix (".csv" ), index = False
316- ) # noqa: E501
307+ if self .debug_mode :
308+ df_relations .to_csv (
309+ output_file_relationship .with_suffix (".csv" ), index = False
310+ ) # noqa: E501
311+
317312 df_relations .to_parquet (
318313 output_file_relationship .with_suffix (".parquet" ), index = False
319314 )
@@ -324,17 +319,17 @@ def transform(self, raw_dir: str, processed_dir: str):
324319 ) # noqa: E501
325320
326321 msg = f"✅ Finished transforming { self .data_source .name } data."
327- return None , True , msg
322+ return True , f" { len ( df_pathways ) } pathways processed"
328323
329324 except Exception as e :
330- msg = f"❌ ETL transform failed: { str (e )} "
325+ msg = f"❌ Transform failed: { str (e )} "
331326 self .logger .log (msg , "ERROR" )
332- return None , False , msg
327+ return False , msg
333328
334329 # 📥 ------------------------ 📥
335330 # 📥 ------ LOAD FASE ------ 📥
336331 # 📥 ------------------------ 📥
337- def load (self , df = None , processed_dir = None , chunk_size = 100_000 ):
332+ def load (self , processed_dir = None ):
338333
339334 msg = f"📥 Loading { self .data_source .name } data into the database..."
340335 self .logger .log (
@@ -347,91 +342,66 @@ def load(self, df=None, processed_dir=None, chunk_size=100_000):
347342
348343 total_pathways = 0
349344 total_warnings = 0
350- load_status = False
351345
352- data_source_id = self .data_source .id
346+ # ALIASES MAP FROM PROCESS DATA FIELDS
347+ self .alias_schema = {
348+ "reactome_id" : ("code" , "Reactome" , True ),
349+ "pathway_name" : ("name" , "Reactome" , None ),
350+ }
353351
354- # Set DB and drop indexes
352+ # READ PROCESSED DATA TO LOAD
355353 try :
356- index_specs = [
357- ("pathway_masters" , ["entity_id" ]),
358- # Já possui index=True + unique=True, mas bom explicitar
359- ("pathway_masters" , ["pathway_id" ]),
360- ("pathway_masters" , ["data_source_id" ]),
361- ]
362-
363- index_specs_entity = [
364- # Entity
365- ("entities" , ["group_id" ]),
366- ("entities" , ["has_conflict" ]),
367- ("entities" , ["is_deactive" ]),
368- # EntityName
369- ("entity_names" , ["entity_id" ]),
370- ("entity_names" , ["name" ]),
371- ("entity_names" , ["data_source_id" ]),
372- ("entity_names" , ["data_source_id" , "name" ]),
373- ("entity_names" , ["data_source_id" , "entity_id" ]),
374- ("entity_names" , ["entity_id" , "is_primary" ]),
375- # EntityRelationship
376- ("entity_relationships" , ["entity_1_id" ]),
377- ("entity_relationships" , ["entity_2_id" ]),
378- ("entity_relationships" , ["relationship_type_id" ]),
379- ("entity_relationships" , ["data_source_id" ]),
380- (
381- "entity_relationships" ,
382- ["entity_1_id" , "relationship_type_id" ],
383- ), # noqa E501
384- (
385- "entity_relationships" ,
386- ["entity_1_id" , "entity_2_id" , "relationship_type_id" ],
387- ), # noqa E501
388- # EntityRelationshipType
389- ("entity_relationship_types" , ["code" ]),
390- ]
354+ # Check if processed dir was set
355+ if not processed_dir :
356+ msg = "⚠️ processed_dir MUST be provided."
357+ self .logger .log (msg , "ERROR" )
358+ return False , msg # ⧮ Leaving with ERROR
391359
392- self . db_write_mode ()
393- # self.drop_indexes(index_specs) # Keep indices to improve checks
394- except Exception as e :
395- total_warnings += 1
396- msg = f"⚠️ Failed to switch DB to write mode or drop indexes: { e } "
397- self . logger . log ( msg , "WARNING" )
360+ processed_path = os . path . join (
361+ processed_dir ,
362+ self . data_source . source_system . name ,
363+ self . data_source . name ,
364+ )
365+ processed_file_name = processed_path + "/master_data.parquet"
398366
399- if df is None :
400- if not processed_dir :
401- msg = "Either 'df' or 'processed_path' must be provided."
367+ if not os .path .exists (processed_file_name ):
368+ msg = f"⚠️ File not found: { processed_file_name } "
402369 self .logger .log (msg , "ERROR" )
403- return total_pathways , load_status , msg
370+ return False , msg # ⧮ Leaving with ERROR
404371
405- processed_path = self .get_path (processed_dir )
406- processed_data = str (processed_path / "master_data.parquet" )
372+ df = pd .read_parquet (processed_file_name , engine = "pyarrow" )
407373
408- if not os . path . exists ( processed_data ) :
409- msg = f"File not found: { processed_data } "
374+ if df . empty :
375+ msg = "DataFrame is empty. "
410376 self .logger .log (msg , "ERROR" )
411- return total_pathways , load_status , msg
377+ return False , msg
412378
413- self .logger .log (
414- f"📥 Reading data in chunks from { processed_data } " , "INFO"
415- ) # noqa E501
379+ df .fillna ("" , inplace = True )
416380
417- df = pd .read_parquet (processed_data , engine = "pyarrow" )
381+ except Exception as e :
382+ msg = f"⚠️ Failed to try read data: { e } "
383+ self .logger .log (msg , "ERROR" )
384+ return False , msg # ⧮ Leaving with ERROR
418385
419- # Get Entity Group ID
420- if not hasattr (self , "entity_group" ) or self .entity_group is None :
421- group = (
422- self .session .query (EntityGroup )
423- .filter_by (name = "Pathways" )
424- .first () # noqa: E501
425- ) # noqa: E501
426- if not group :
427- msg = "EntityGroup 'Pathways' not found in the database."
428- self .logger .log (msg , "ERROR" )
429- return total_pathways , load_status
430- # raise ValueError(msg)
431- self .entity_group = group .id
432- msg = f"EntityGroup ID for 'Pathways' is { self .entity_group } "
433- self .logger .log (msg , "DEBUG" )
386+ # Set DB and drop indexes
387+ try :
388+ self .db_write_mode ()
389+ self .drop_indexes (self .get_pathway_index_specs )
390+ self .drop_indexes (self .get_entity_index_specs )
391+ except Exception as e :
392+ total_warnings += 1
393+ msg = f"⚠️ Failed to switch DB to write mode or drop indexes: { e } "
394+ self .logger .log (msg , "WARNING" )
395+ return False , msg # ⧮ Leaving with ERROR
396+
397+ # GET ENTITY GROUP ID AND OMICS STATUS
398+ try :
399+ self .get_entity_group ("Pathways" )
400+ except Exception as e :
401+ msg = f"Error on DTP to get Entity Group: { e } "
402+ return False , msg # ⧮ Leaving with ERROR
434403
404+ # RUN LOAD BY ROW
435405 try :
436406 # Interaction to each Reactome Pathway
437407 for _ , row in df .iterrows ():
@@ -444,16 +414,37 @@ def load(self, df=None, processed_dir=None, chunk_size=100_000):
444414 self .logger .log (msg , "WARNING" )
445415 continue
446416
417+ # --- ALIASES STRUCTURE ---
418+ # Create a dict of Aliases
419+ alias_dict = self .build_alias (row )
420+ # Only primary Name
421+ is_primary_alias = next (
422+ (a for a in alias_dict if a .get ("is_primary" )), None
423+ )
424+ # Only Aliases Names
425+ not_primary_alias = [
426+ a for a in alias_dict if a != is_primary_alias
427+ ] # noqa E501
428+
447429 # Add or Get Entity
448430 entity_id , _ = self .get_or_create_entity (
449- name = pathway_master ,
431+ name = is_primary_alias [ "alias_value" ] ,
450432 group_id = self .entity_group ,
451433 data_source_id = self .data_source .id ,
434+ package_id = self .package .id ,
435+ alias_type = is_primary_alias ["alias_type" ],
436+ xref_source = is_primary_alias ["xref_source" ],
437+ alias_norm = is_primary_alias ["alias_norm" ],
438+ is_active = True ,
452439 )
453440
454- # Add or Get Entity Name
455441 self .get_or_create_entity_name (
456- entity_id , pathway_name , data_source_id = self .data_source .id
442+ group_id = self .entity_group ,
443+ entity_id = entity_id ,
444+ aliases = not_primary_alias ,
445+ is_active = True ,
446+ data_source_id = self .data_source .id , # noqa E501
447+ package_id = self .package .id ,
457448 )
458449
459450 # Check if the pathway already exists
@@ -471,7 +462,7 @@ def load(self, df=None, processed_dir=None, chunk_size=100_000):
471462 entity_id = entity_id ,
472463 pathway_id = pathway_master ,
473464 description = pathway_name ,
474- data_source_id = data_source_id ,
465+ data_source_id = self . data_source . id ,
475466 )
476467
477468 self .session .add (pathway )
@@ -482,29 +473,23 @@ def load(self, df=None, processed_dir=None, chunk_size=100_000):
482473 except Exception as e :
483474 msg = f"❌ ETL load_relations failed: { str (e )} "
484475 self .logger .log (msg , "ERROR" )
485- return 0 , False , msg
476+ return False , msg
486477
487478 # Set DB to Read Mode and Create Index
488479 try :
489- # Drop Indexs
490- self .drop_indexes (index_specs )
491- self .drop_indexes (index_specs_entity )
492- # Stating Indexs
493- self .create_indexes (index_specs )
494- self .create_indexes (index_specs_entity )
480+ self .create_indexes (self .get_pathway_index_specs )
481+ self .create_indexes (self .get_entity_index_specs )
495482 self .db_read_mode ()
496483 except Exception as e :
497484 total_warnings += 1
498485 msg = f"Failed to switch DB to write mode or drop indexes: { e } "
499486 self .logger .log (msg , "WARNING" )
500487
501- load_status = True
502-
503488 if total_warnings != 0 :
504489 msg = f"{ total_warnings } warning to analysis in log file"
505490 self .logger .log (msg , "WARNING" )
506491
507492 msg = f"📥 Total Pathways: { total_pathways } " # noqa E501 # noqa E501
508493 self .logger .log (msg , "INFO" )
509494
510- return total_pathways , True , msg
495+ return True , msg
0 commit comments