-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
417 lines (353 loc) · 15.9 KB
/
utils.py
File metadata and controls
417 lines (353 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
import streamlit as st
import requests
import yaml
import pandas as pd
import io
from xml.etree import ElementTree
import time
import numpy as np
from psims.mzml.writer import MzMLWriter
from io import BytesIO
import logging
def write_job_params(task_id:str):
if task_id.startswith("DEV-"):
base_url = "http://dev.gnps2.org:4000"
task_id = task_id.replace("DEV-", "")
elif task_id.startswith("BETA-"):
base_url = "https://beta.gnps2.org"
task_id = task_id.replace("BETA-", "")
else:
base_url = "https://gnps2.org"
params_url = f"{base_url}/resultfile?task={task_id}&file=job_parameters.yaml"
merge_params_url = f"{base_url}/resultfile?task={task_id}&file=nf_output/merge_parameters.txt"
r = requests.get(params_url,timeout=(60,60))
r.raise_for_status()
yaml_content = yaml.safe_load(r.text)
if yaml_content.get('input_metadata_file') != 'NO_FILE':
metadata_provided = 'Yes'
else:
metadata_provided = 'No'
if yaml_content.get('database_search_mass_range_lower') is not None and \
yaml_content.get('database_search_mass_range_upper') is not None:
protein_mass_range = f"({yaml_content.get('database_search_mass_range_lower')}, {yaml_content.get('database_search_mass_range_upper')})"
else:
protein_mass_range = 'Unknown Parameter'
# Get bin size from merge
r = requests.get(merge_params_url,timeout=(60,60))
if r.status_code == 500:
# This is an old file, the bin size is 10.0
bin_size = 10.0
else:
r.raise_for_status()
merge_params = yaml.safe_load(r.text)
bin_size = merge_params.get('bin_size', None)
if bin_size is None:
st.error("Bin size not found in merge parameters. Please re-run the task.")
st.stop()
yaml_content['bin_size'] = bin_size
st.info(f"""
**Workflow Parameters:** \n
**Description:** {yaml_content.get('description', '')}
**Distance Measure:** {yaml_content.get('distance', 'Unkown Parameter')}
**Distance Calculation Bin Size:** {yaml_content.get('search_bin_size', 'Unkown Parameter')} Da
**Knowledgebase Search Threshold:** {yaml_content.get('database_search_threshold', 'Unkown Parameter')}
**Protein Knowledgebase Search Mass Range:** {protein_mass_range}
**Metadata File Provided:** {metadata_provided}
**Heatmap Bin Size:** {bin_size} Da
""")
return yaml_content
def write_warnings(param_url:str)->None:
r = requests.get(param_url,timeout=(60,60))
if r.status_code == 500:
st.warning(f"**Warning:** This is an old task and warnings are not available. Please re-run the task if you want to gather warnings.")
return None
# No warnings
if r.text == '':
st.info("✅ No warnings or errors were found in the input files.")
return None
warnings_df = pd.read_csv(io.StringIO(r.text), index_col=0)
total_warnings = len(warnings_df)
logging.info(f"Total warnings/errors found: {total_warnings}")
print(f"Total warnings/errors found: {total_warnings}", flush=True)
if total_warnings == 0:
return None
if 'Error_Level' in warnings_df.columns: # For backwards compatability
errors_df = warnings_df.loc[warnings_df['Error_Level'] == 'critical']
warnings_df = warnings_df.loc[warnings_df['Error_Level'] != 'critical']
else:
errors_df = pd.DataFrame()
if len(warnings_df) > 0:
# For backwards compatability of tasks
if 'Error' in warnings_df.columns:
warnings_df['Warning'] = warnings_df['Error']
warnings_df.drop(columns=['Error'], inplace=True)
# Drop index
with st.expander(f":warning: View {len(warnings_df)} Input File Warnings"):
# Show the table
st.write(warnings_df)
if len(errors_df) > 0:
with st.expander(f":exclamation: View {len(errors_df)} Input File Errors"):
st.write(errors_df)
return None
import requests
from xml.etree import ElementTree
def get_genbank_metadata(genbank_accession: str) -> dict:
"""
Fetches metadata and taxonomy for a given GenBank accession number using NCBI's E-utilities API.
Parameters:
genbank_accession (str): The GenBank accession number.
Returns:
dict: A dictionary containing metadata and taxonomy for the accession.
"""
# Function to fetch metadata from NCBI
def _fetch_metadata(accession, db):
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?"
params = {
"db": db,
"term": accession,
"retmode": "json"
}
response = requests.get(base_url, params=params)
if response.status_code != 200:
raise Exception(f"Error fetching data from NCBI: {response.status_code}")
return response.json()
# Function to fetch taxonomy from NCBI
def _fetch_taxonomy(taxid):
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?"
params = {
"db": "taxonomy",
"id": taxid,
"retmode": "json"
}
response = requests.get(base_url, params=params)
if response.status_code != 200:
raise requests.exceptions.HTTPError(f"Error fetching taxonomy from NCBI: {response.status_code}")
return response.json()
# Function to fetch assembly information from NCBI
def _fetch_assembly(assembly_id):
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?"
params = {
"db": "nucleotide",
"id": assembly_id,
"retmode": "json"
}
response = requests.get(base_url, params=params)
if response.status_code != 200:
raise requests.exceptions.HTTPError(f"Error fetching taxonomy from NCBI: {response.status_code}")
return response.json()
# Fetch metadata
try:
genbank_id = None
taxid = None
taxonomy_json = None
metadata_json = _fetch_metadata(genbank_accession, 'nucleotide') # Changed db to 'assembly'
metadata = {}
num_results = int(metadata_json['esearchresult'].get('count', 0))
if num_results > 1:
st.warning(f"Multiple results found for GenBank accession {genbank_accession}. Using the first one.")
elif num_results == 1:
idlist = metadata_json['esearchresult'].get('idlist', [])
if len(idlist) > 1:
st.warning(f"Multiple taxonomy ids found for GenBank accession {genbank_accession}. Using the first one.")
elif len(idlist) == 1:
genbank_id = idlist[0]
# at this point taxid will be None if it was not identified
# Fetch taxid if genbank_id is found
if genbank_id:
# Overwrite metadata_json with assembly information
metadata_json = _fetch_assembly(genbank_id)
if 'error' in metadata_json['result'][genbank_id]:
raise Exception(f"Error fetching assembly information for GenBank accession {genbank_accession}: {metadata_json['result'][genbank_id]['error']}")
taxid = metadata_json['result'][genbank_id].get('taxid', None)
# Fetch taxonomy if taxid is found
if taxid:
taxonomy_json = None
taxonomy_json = _fetch_taxonomy(taxid)
taxonomy_json = taxonomy_json['result'][str(taxid)]
if 'error' in taxonomy_json:
st.warning(f"Error fetching taxonomy for GenBank accession {genbank_accession}: {taxonomy_json['error']}")
taxonomy_json = None
# Filter the metadata to only what we want to integrate:
if taxonomy_json:
output_dict = {
'TaxID From Genbank': taxid,
'Genbank accession': genbank_accession,
'Genbank-Division': taxonomy_json.get('division', 'Unknown'),
'Genbank-Scientific Name': taxonomy_json.get('scientificname', 'Unknown'),
'Genbank-Genus': taxonomy_json.get('genus', 'Unknown'),
'Genbank-Species': taxonomy_json.get('species', 'Unknown'),
'Genbank-Subspecies': taxonomy_json.get('subsp', 'Unknown'),
'Genbank-GenBank Division': taxonomy_json.get('genbankdivision', 'Unknown'),
}
for key in output_dict:
if output_dict[key] == '':
output_dict[key] = 'None'
return output_dict, 1
return None, 0
except Exception as e:
return {
'TaxID From Genbank': taxid,
'Genbank accession': genbank_accession,
'Genbank-Division': 'Unknown',
'Genbank-Scientific Name': 'Unknown',
'Genbank-Genus': 'Unknown',
'Genbank-Species': 'Unknown',
'Genbank-Subspecies': 'Unknown',
'Genbank-GenBank Division': 'Unknown',
}, 0
@st.cache_data(max_entries=1000)
def enrich_genbank_metadata(df:pd.DataFrame)->pd.DataFrame:
"""Enriches a DataFrame with metadata from GenBank accessions based on the 'Genbank accession' column.
returns a DataFrame with the metadata added as new columns. If no metadata was found for all accessions,
the original DataFrame is returned unchanged.
Parameters:
df (pd.DataFrame): The DataFrame to enrich.
Returns:
pd.DataFrame: The enriched DataFrame.
"""
# Get the unique genbank accessions
genbank_accessions = df['Genbank accession'].dropna().unique()
# Get the metadata for each genbank accession
result = []
for accession in genbank_accessions:
result.append(get_genbank_metadata(accession))
time.sleep(0.1) # To avoid rate limiting
# Check that they are not all empty
metadata = [r[0] for r in result if r[1] == 1]
success = [r[1] for r in result]
if sum(success) == 0:
return df
# Create a DataFrame from the metadata
metadata_df = pd.DataFrame(metadata)
# Merge the metadata with the original DataFrame
df = df.merge(metadata_df, left_on='Genbank accession', right_on='Genbank accession')
return df
def custom_css():
# Some custom CSS to allow for markdown labels on buttons
st.markdown(
"""
<style>
.button-label {
margin-bottom: 5px;
font-size: 14px;
}
.button-label.centered {
padding-top: 25px;
text-align: center;
}
div[data-testid="stExpander"] summary p { /* Careful here, this has changed in past releases! */
font-size: 2rem;
}
</style>
""",
unsafe_allow_html=True
)
def metadata_validation(metadata_table:pd.DataFrame, spectrum_df:pd.DataFrame):
"""Validates the metadata table against the spectrum file. This function checks the following:
1. There are no duplicate values in the "Filename" column. If there are, the fucntion will raise an error and halt execution.
2. That the values in the "Filename" column of the metadata_table and spectrum_df match. If they do not,
the function will display a warning.
Any changes to this function should also be reflected in the IDBac workflow code.
"""
# Check for duplicates in the metadata table
duplicated_rows = metadata_table[metadata_table['Filename'].duplicated(keep=False)]
if not duplicated_rows.empty:
st.error("The metadata table contains duplicate values in the 'Filename' column. Please remove duplicates and try again.")
st.write("Duplicated Rows:", duplicated_rows)
st.stop()
# Check that the values in the metadata table and spectrum_df match
metadata_filenames = set(metadata_table['Filename'])
if spectrum_df is None:
spectrum_filenames = set()
else:
spectrum_filenames = set(spectrum_df['filename'])
filenames_in_metadata_not_in_spectrum = list(metadata_filenames - spectrum_filenames)
filenames_in_spectrum_not_in_metadata = list(spectrum_filenames - metadata_filenames)
if len(filenames_in_metadata_not_in_spectrum) > 0:
if st.session_state.get('show_warnings', True):
with st.expander(":warning: Filenames in metadata table not in spectrum file:"):
st.write(filenames_in_metadata_not_in_spectrum)
if len(filenames_in_spectrum_not_in_metadata) > 0 and \
spectrum_df is not None: # If none, that means there are no spectra at all and the user has already been warned
if st.session_state.get('show_warnings', True):
with st.expander(":warning: Filenames in spectrum file not in metadata table:"):
st.write(filenames_in_spectrum_not_in_metadata)
def format_proteins_as_strings(df):
output = []
for row in df.to_dict(orient="records"):
if row['db_search_result']:
output.append(f"KB Result - {row['db_strain_name']}")
else:
output.append(row['filename'])
return output
def parse_numerical_input(string:str) -> list:
initial_list = [entry.strip() for entry in string.split(",")]
# Format ranges as tuples: '[10-20]' -> (10, 20), [10-] -> (10, np.inf)
output_list = []
for entry in initial_list:
try:
entry_as_float = float(entry)
if entry_as_float < 0:
st.error(f"Negative values are not allowed: {entry}")
return []
output_list.append(entry_as_float)
continue
except Exception as e:
pass
try:
if "-" in entry:
entry = entry.replace("[", "").replace("]", "")
start, end = entry.split("-")
if start == "":
start = 0
if end == "":
end = np.inf
if start != "" and end != "":
if float(end) - float(start) < 0:
st.error(f"Invalid range: {entry}. The start value must be less than the end value.")
output_list.append((float(start), float(end)))
else:
output_list.append(float(entry))
except Exception as e:
st.error(f"Could not parse entry: {entry} " + str(e))
return []
return output_list
def _convert_bin_to_mz(bin_name, bin_size):
b = int(bin_name.split("_")[-1])
return f"[{b * bin_size}, {(b + 1) * bin_size})"
def _convert_bin_to_mz_tuple(bin_name, bin_size):
b = int(bin_name.split("_")[-1])
return (b * bin_size, (b + 1) * bin_size)
def convert_to_mzml(spectrum_dict:dict):
"""Converts a dictionary to an mzML file using psims. Returns in a BytesIO object.
Args:
json_run (Path): The path to the json run file.
Returns:
BytesIO: The mzML file as a BytesIO object.
"""
if not isinstance(spectrum_dict, dict):
raise ValueError("json_run must be a dictionary.")
output_bytes = BytesIO()
with MzMLWriter(output_bytes, close=False) as out:
out.controlled_vocabularies()
# Write the metadata as user parameters
params = {}
params['id'] = 'global_metadata'
out.reference_param_group_list([
params
])
with out.run(id="admin_qc_download"):
with out.spectrum_list(count=1):
scan = 1
mz_array = spectrum_dict["m/z array"]
intensity_array = spectrum_dict["intensity array"]
out.write_spectrum(
mz_array, intensity_array,
id="scan={}".format(scan),
params=[
"MS1 Spectrum",
{"ms level": 1},
{"total ion current": sum(intensity_array)}
])
scan += 1
return output_bytes