Skip to content

Commit 2efb09f

Browse files
committed
Loads logs
1 parent 0dc539e commit 2efb09f

12 files changed

Lines changed: 989 additions & 14 deletions

biofilter/etl/dtps/dtp_variant_ncbi.py

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ def load(self, processed_dir=None):
446446
target_chrom = self._extract_chrom_from_ds(self.data_source.name) # noqa E501
447447
result = self.session.execute(
448448
text("""
449-
SELECT variant_id, entity_id
449+
SELECT rs_id, entity_id
450450
FROM variant_masters
451451
WHERE chromosome = :chrom
452452
"""),
@@ -599,7 +599,7 @@ def map_entrez_to_entity(entrez_list, gene_dict):
599599
# ---= Inserir Variants as Master in bulk =----
600600
variant_objects = [
601601
VariantMaster(
602-
variant_id=row.rs_id,
602+
rs_id=row.rs_id,
603603
variant_type=row.variant_type,
604604
omic_status_id=1,
605605
chromosome=row.chromosome,
@@ -617,7 +617,7 @@ def map_entrez_to_entity(entrez_list, gene_dict):
617617

618618
# --- Create Variant Master ID Column ---
619619
rsid_to_variant_master_id = {
620-
variant.variant_id: variant.id for variant in variant_objects
620+
variant.rs_id: variant.id for variant in variant_objects
621621
}
622622
df["variant_master_id"] = df["rs_id"].map(rsid_to_variant_master_id)
623623

@@ -673,9 +673,15 @@ def map_entrez_to_entity(entrez_list, gene_dict):
673673
)
674674

675675
# --Inserir VariantLocus
676+
build = row.assembly
677+
build = build.replace("GRCh", "").split(".")[0] # '38'
678+
676679
locus_records.append(
677680
VariantLocus(
678681
variant_id=row.variant_master_id,
682+
rs_id=row.rs_id,
683+
entity_id=row.entity_id,
684+
build=build,
679685
assembly_id=row.assembly_id,
680686
chromosome=row.chromosome,
681687
start_pos=row.start_pos,
@@ -688,33 +694,100 @@ def map_entrez_to_entity(entrez_list, gene_dict):
688694
)
689695

690696
# - Processar placements (se existirem)
697+
# placements = getattr(row, "placements", []) or []
698+
# for p in placements:
699+
# p_acc = p.get("seq_id")
700+
# asm_id = assemblies_map.get(p_acc)
701+
# if not asm_id:
702+
# continue
703+
# p_start = p.get("start_pos")
704+
# p_end = p.get("end_pos")
705+
# if not p_start or not p_end:
706+
# continue
707+
708+
# ref = p.get("ref")
709+
# alt = p.get("alt")
710+
# if not alt or alt == ref:
711+
# continue
712+
713+
# chrom = acc2chrom.get(p_acc)
714+
# ref_json = json.dumps([str(ref)]) if ref else json.dumps([])
715+
# alt_json = json.dumps([str(alt)]) if alt else json.dumps([])
716+
717+
# build = p.get("assembly")
718+
# build = build.replace("GRCh", "").split(".")[0]
719+
720+
# placement_locus.append(
721+
# VariantLocus(
722+
# variant_id=row.variant_master_id,
723+
# rs_id=row.rs_id,
724+
# entity_id=row.entity_id,
725+
# build=build,
726+
# assembly_id=asm_id,
727+
# chromosome=chrom,
728+
# start_pos=int(p_start),
729+
# end_pos=int(p_end),
730+
# reference_allele=ref_json,
731+
# alternate_allele=alt_json,
732+
# data_source_id=self.data_source.id,
733+
# etl_package_id=self.package.id,
734+
# )
735+
# )
691736
placements = getattr(row, "placements", []) or []
737+
738+
# Acumulador por locus: (assembly_id, chrom, start, end, ref) -> {build, alts:set}
739+
agg = {}
740+
692741
for p in placements:
693742
p_acc = p.get("seq_id")
694743
asm_id = assemblies_map.get(p_acc)
695744
if not asm_id:
696745
continue
746+
697747
p_start = p.get("start_pos")
698748
p_end = p.get("end_pos")
699749
if not p_start or not p_end:
700750
continue
701751

702752
ref = p.get("ref")
703753
alt = p.get("alt")
754+
# ignorar alt vazio ou igual ao ref (sem variação)
704755
if not alt or alt == ref:
705756
continue
706757

707758
chrom = acc2chrom.get(p_acc)
759+
if not chrom:
760+
continue
761+
762+
# build: "GRCh38.p14" -> "38"
763+
build = p.get("assembly") or ""
764+
build = build.replace("GRCh", "").split(".")[0]
765+
766+
key = (asm_id, chrom, int(p_start), int(p_end), str(ref or ""))
767+
768+
bucket = agg.get(key)
769+
if bucket is None:
770+
bucket = {"build": build, "alts": set()}
771+
agg[key] = bucket
772+
773+
bucket["alts"].add(str(alt))
774+
775+
# Agora, gerar UMA linha por locus com alts agregados
776+
for (asm_id, chrom, start, end, ref), bucket in agg.items():
708777
ref_json = json.dumps([str(ref)]) if ref else json.dumps([])
709-
alt_json = json.dumps([str(alt)]) if alt else json.dumps([])
778+
# ordena para estabilidade determinística
779+
alt_json = json.dumps(sorted(bucket["alts"]))
710780

711781
placement_locus.append(
712782
VariantLocus(
713783
variant_id=row.variant_master_id,
784+
rs_id=row.rs_id,
785+
entity_id=row.entity_id,
786+
build=bucket["build"],
714787
assembly_id=asm_id,
715788
chromosome=chrom,
716-
start_pos=int(p_start),
717-
end_pos=int(p_end),
789+
start_pos=start,
790+
end_pos=end,
718791
reference_allele=ref_json,
719792
alternate_allele=alt_json,
720793
data_source_id=self.data_source.id,
@@ -759,6 +832,9 @@ def map_entrez_to_entity(entrez_list, gene_dict):
759832
for loc in all_loci:
760833
key = (
761834
loc.variant_id,
835+
loc.rs_id,
836+
loc.entity_id,
837+
loc.build,
762838
loc.assembly_id,
763839
loc.chromosome,
764840
loc.start_pos,
@@ -773,6 +849,8 @@ def map_entrez_to_entity(entrez_list, gene_dict):
773849
unique_loci.append(loc)
774850
self.session.bulk_save_objects(unique_loci)
775851

852+
# TODO: no chromossomo Y temos dados do X
853+
776854
# manda para a DB
777855
self.session.commit()
778856

roteiro

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
full process
2+
y - DONE
3+
x - DONE
4+
22 - DONE
5+
21 - DONE
6+
20 - DONE
7+
19 - DONE
8+
18 - DONE
9+
17 - DONE
10+
16 - DONE
11+
15 - DONE
12+
14 - DONE
13+
13 - DONE
14+
12 - DONE
15+
11 - DONE
16+
10 - DONE
17+
09 - Loding
18+
08 - Loding
19+
07 - extracting
20+
06 - wait to transform
21+
05 - wait to Loding
22+
04 - Loding
23+
03 - wait to transform
24+
02 - Loding
25+
01 - DONE

scripts/Run_ETLManager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
# Variants
2929
# --------
3030
# "dbsnp_sample",
31-
"dbsnp_chr1",
31+
# "dbsnp_chr1",
3232
# "dbsnp_chr2",
3333
# "dbsnp_chr3",
3434
# "dbsnp_chr4",
@@ -51,7 +51,7 @@
5151
# "dbsnp_chr21",
5252
# "dbsnp_chr22",
5353
# "dbsnp_chrx",
54-
# "dbsnp_chry",
54+
"dbsnp_chry",
5555
# "dbsnp_chrmt",
5656
# "gwas",
5757
#

scripts/carga_02_load.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from biofilter import Biofilter
2+
3+
# db_uri = "sqlite:///biofilter.db"
4+
db_uri = "postgresql+psycopg2://bioadmin:bioadmin@localhost/biofilter"
5+
6+
# Configure below
7+
data_sources_to_process = [
8+
# Genes
9+
# -----
10+
# "hgnc",
11+
# "gene_ncbi",
12+
# "ensembl",
13+
# "gene_ncbi",
14+
#
15+
# Proteins
16+
# --------
17+
# "pfam",
18+
# "uniprot",
19+
#
20+
# Pathways
21+
# --------
22+
# "reactome",
23+
# "kegg_pathways",
24+
#
25+
# Gene Ontology
26+
# -------------
27+
# "gene_ontology",
28+
#
29+
# Variants
30+
# --------
31+
# "dbsnp_sample",
32+
# "dbsnp_chr1",
33+
"dbsnp_chr2",
34+
# "dbsnp_chr3",
35+
# "dbsnp_chr4",
36+
# "dbsnp_chr5",
37+
# "dbsnp_chr6",
38+
# "dbsnp_chr7",
39+
# "dbsnp_chr8",
40+
# "dbsnp_chr9",
41+
# "dbsnp_chr10",
42+
# "dbsnp_chr11",
43+
# "dbsnp_chr12",
44+
# "dbsnp_chr13",
45+
# "dbsnp_chr14",
46+
# "dbsnp_chr15",
47+
# "dbsnp_chr16",
48+
# "dbsnp_chr17",
49+
# "dbsnp_chr18",
50+
# "dbsnp_chr19",
51+
# "dbsnp_chr21",
52+
# "dbsnp_chr21",
53+
# "dbsnp_chr22",
54+
# "dbsnp_chrx",
55+
# "dbsnp_chry",
56+
# "dbsnp_chrmt",
57+
# "gwas",
58+
#
59+
# RelationShips
60+
# -------------
61+
# "reactome_relationships",
62+
# "uniprot_relationships",
63+
#
64+
# DISEASE
65+
# -------
66+
# "mondo",
67+
# "mondo_relationships",
68+
#
69+
# CHEMICAL
70+
# --------
71+
# "chebi",
72+
]
73+
74+
run_steps = [
75+
# "extract",
76+
# "transform",
77+
"load",
78+
# "all"
79+
] # noqa E501
80+
81+
if __name__ == "__main__":
82+
bf = Biofilter(db_uri, debug_mode=True)
83+
# bf = Biofilter(db_uri)
84+
85+
for source in data_sources_to_process:
86+
for step in run_steps:
87+
if step != "all":
88+
try:
89+
print(f"▶ Running ETL - Source: {source} | Step: {step}")
90+
bf.update(
91+
data_sources=[source],
92+
run_steps=[step],
93+
force_steps=[step],
94+
)
95+
except Exception as e:
96+
print(f"❌ Error processing {source} [{step}]: {e}")
97+
elif step == "all":
98+
try:
99+
print(f"▶ Running ETL - Source: {source} | Step: {step}")
100+
bf.update(
101+
data_sources=[source],
102+
# run_steps=[step],
103+
# force_steps=[step],
104+
)
105+
except Exception as e:
106+
print(f"❌ Error processing {source} [{step}]: {e}")
107+
108+
print("✅ All ETL tasks finished.")
109+
print("------------------------------")

0 commit comments

Comments
 (0)