Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion ms2rescore/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import importlib.resources
import json
import multiprocessing as mp
import re
from argparse import Namespace
from pathlib import Path
from typing import Dict, List, Union

try:
import tomllib
except ImportError:
import tomli as tomllib
import tomli as tomllib # type: ignore

from cascade_config import CascadeConfig

Expand Down Expand Up @@ -86,6 +87,36 @@ def _validate_processes(config: Dict) -> Dict:
return config


def _validate_regular_expressions(config: Dict) -> Dict:
"""Validate regular expressions in configuration."""
for field in [
"psm_id_pattern",
"spectrum_id_pattern",
"psm_id_rt_pattern",
"psm_id_im_pattern",
]:
if config["ms2rescore"][field]:

# Check if valid regex
try:
pattern = re.compile(config["ms2rescore"][field])
except re.error as e:
raise MS2RescoreConfigurationError(
f"Invalid regular expression provided for '{field}': {e}"
) from e

# Check if regex has exactly one capturing group
if pattern.groups != 1:
raise MS2RescoreConfigurationError(
f"Regular expression for '{field}' should contain exactly one "
"capturing group. Please check and try again. "
"See https://ms2rescore.readthedocs.io/en/stable/userguide/configuration/#mapping-psms-to-spectra "
"for more information."
)

return config


def parse_configurations(configurations: List[Union[dict, str, Path, Namespace]]) -> Dict:
"""
Parse and validate MS²Rescore configuration files and CLI arguments.
Expand Down Expand Up @@ -142,6 +173,7 @@ def parse_configurations(configurations: List[Union[dict, str, Path, Namespace]]
# Validate and infer filenames and number of parallel processes
config = _validate_filenames(config)
config = _validate_processes(config)
config = _validate_regular_expressions(config)

# Convert feature_generators and rescoring_engine names to lowercase
config["ms2rescore"]["feature_generators"] = {
Expand Down
37 changes: 28 additions & 9 deletions ms2rescore/parse_psms.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,39 @@ def parse_psms(config: Dict, psm_list: Union[PSMList, None]) -> PSMList:
psm_list = _remove_invalid_aa(psm_list)
_find_decoys(psm_list, config["id_decoy_pattern"])
_calculate_qvalues(psm_list, config["lower_score_is_better"])

# Parse retention time and/or ion mobility from spectrum_id if patterns provided
if config["psm_id_rt_pattern"] or config["psm_id_im_pattern"]:
logger.debug("Parsing retention time and/or ion mobility from PSM identifier...")
_parse_values_from_spectrum_id(
psm_list, config["psm_id_rt_pattern"], config["psm_id_im_pattern"]
)

# Apply psm_id_pattern if provided
if config["psm_id_pattern"]:
pattern = re.compile(config["psm_id_pattern"])
logger.debug("Applying 'psm_id_pattern'...")
logger.debug(
f"Parsing '{psm_list[0].spectrum_id}' to '{_match_psm_ids(psm_list[0].spectrum_id, pattern)}'"
)
new_ids = [_match_psm_ids(old_id, pattern) for old_id in psm_list["spectrum_id"]]

# Validate that the number of unique IDs remains the same
if len(set(new_ids)) != len(set(psm_list["spectrum_id"])):
example_old_id = psm_list["spectrum_id"][0]
example_new_id = new_ids[0]
raise MS2RescoreConfigurationError(
"'psm_id_pattern' resulted in a different number of unique PSM IDs. "
"This might indicate issues with the regex pattern. Please check and try again. "
f"Example old ID: '{example_old_id}' -> new ID: '{example_new_id}'. "
" See "
"https://ms2rescore.readthedocs.io/en/stable/userguide/configuration/#mapping-psms-to-spectra "
"for more information."
)

# Assign new IDs
psm_list["spectrum_id"] = new_ids

# Store scoring values for comparison later
for psm in psm_list:
psm.provenance_data.update(
Expand All @@ -62,6 +89,7 @@ def parse_psms(config: Dict, psm_list: Union[PSMList, None]) -> PSMList:
}
)

# Rename and add modifications
logger.debug("Parsing modifications...")
modifications_found = set(
[
Expand All @@ -81,15 +109,6 @@ def parse_psms(config: Dict, psm_list: Union[PSMList, None]) -> PSMList:
psm_list.add_fixed_modifications(config["fixed_modifications"])
psm_list.apply_fixed_modifications()

if config["psm_id_pattern"]:
pattern = re.compile(config["psm_id_pattern"])
logger.debug("Applying 'psm_id_pattern'...")
logger.debug(
f"Parsing '{psm_list[0].spectrum_id}' to '{_match_psm_ids(psm_list[0].spectrum_id, pattern)}'"
)
new_ids = [_match_psm_ids(old_id, pattern) for old_id in psm_list["spectrum_id"]]
psm_list["spectrum_id"] = new_ids

return psm_list


Expand Down
62 changes: 47 additions & 15 deletions ms2rescore/parse_spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from typing import Optional, Set, Tuple

import numpy as np
from ms2rescore_rs import get_precursor_info
from ms2rescore_rs import Precursor, get_precursor_info
from psm_utils import PSMList

from ms2rescore.exceptions import MS2RescoreError
from ms2rescore.exceptions import MS2RescoreConfigurationError, MS2RescoreError
from ms2rescore.utils import infer_spectrum_path

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -149,6 +149,43 @@ def add_precursor_values(
return available_data_types


def _apply_spectrum_id_pattern(
precursors: dict[str, Precursor], pattern: str
) -> dict[str, Precursor]:
"""Apply spectrum ID pattern to precursor IDs."""
# Map precursor IDs using regex pattern
compiled_pattern = re.compile(pattern)
id_mapping = {
match.group(1): spectrum_id
for spectrum_id in precursors.keys()
if (match := compiled_pattern.search(spectrum_id)) is not None
}

# Validate that any IDs were matched
if not id_mapping:
raise MS2RescoreConfigurationError(
"'spectrum_id_pattern' did not match any spectrum-file IDs. Please check and try "
"again. See "
"https://ms2rescore.readthedocs.io/en/stable/userguide/configuration/#mapping-psms-to-spectra "
"for more information."
)

# Validate that the same number of unique IDs were matched
elif len(id_mapping) != len(precursors):
new_id, old_id = next(iter(id_mapping.items()))
raise MS2RescoreConfigurationError(
"'spectrum_id_pattern' resulted in a different number of unique spectrum IDs. This "
"indicates issues with the regex pattern. Please check and try again. "
f"Example old ID: '{old_id}' -> new ID: '{new_id}'. "
"See https://ms2rescore.readthedocs.io/en/stable/userguide/configuration/#mapping-psms-to-spectra "
"for more information."
)

precursors = {new_id: precursors[orig_id] for new_id, orig_id in id_mapping.items()}

return precursors


def _get_precursor_values(
psm_list: PSMList, spectrum_path: str, spectrum_id_pattern: Optional[str] = None
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
Expand All @@ -162,23 +199,18 @@ def _get_precursor_values(
spectrum_file = infer_spectrum_path(spectrum_path, run_name)

LOGGER.debug("Reading spectrum file: '%s'", spectrum_file)
precursors = get_precursor_info(str(spectrum_file))
precursors: dict[str, Precursor] = get_precursor_info(str(spectrum_file))

# Parse spectrum IDs with regex pattern if provided
if spectrum_id_pattern:
compiled_pattern = re.compile(spectrum_id_pattern)
precursors = {
match.group(1): precursor
for spectrum_id, precursor in precursors.items()
if (match := compiled_pattern.search(spectrum_id)) is not None
}

# Ensure all PSMs have a precursor values
precursors = _apply_spectrum_id_pattern(precursors, spectrum_id_pattern)

# Ensure all PSMs have precursor values
for psm in psm_list_run:
if psm.spectrum_id not in precursors:
raise SpectrumParsingError(
"Mismatch between PSM and spectrum file IDs. Could find precursor values "
f"for PSM with ID {psm.spectrum_id} in run {run_name}.\n"
raise MS2RescoreConfigurationError(
"Mismatch between PSM and spectrum file IDs. Could not find precursor "
f"values for PSM with ID {psm.spectrum_id} in run {run_name}.\n"
"Please check that the `spectrum_id_pattern` and `psm_id_pattern` options "
"are configured correctly. See "
"https://ms2rescore.readthedocs.io/en/stable/userguide/configuration/#mapping-psms-to-spectra"
Expand All @@ -199,6 +231,6 @@ def _get_precursor_values(


class SpectrumParsingError(MS2RescoreError):
"""Error parsing retention time from spectrum file."""
"""Error while parsing spectrum file."""

pass
96 changes: 95 additions & 1 deletion tests/test_config_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pathlib import Path

from ms2rescore.config_parser import _parse_output_path
import pytest

from ms2rescore.config_parser import _parse_output_path, _validate_regular_expressions
from ms2rescore.exceptions import MS2RescoreConfigurationError


def test__parse_output_path():
Expand All @@ -21,3 +24,94 @@ def test__parse_output_path():

for output_path, expected in test_cases:
assert _parse_output_path(output_path, test_psm_file) == expected


def test__validate_regular_expressions_valid():
"""Test _validate_regular_expressions with valid regex patterns."""
# Valid pattern with one capturing group
config = {
"ms2rescore": {
"psm_id_pattern": r"scan:(\d+):.*",
"spectrum_id_pattern": r"spectrum_(\d+)",
"psm_id_rt_pattern": None,
"psm_id_im_pattern": None,
}
}
result = _validate_regular_expressions(config)
assert result == config


def test__validate_regular_expressions_none():
"""Test _validate_regular_expressions with None patterns."""
config = {
"ms2rescore": {
"psm_id_pattern": None,
"spectrum_id_pattern": None,
"psm_id_rt_pattern": None,
"psm_id_im_pattern": None,
}
}
result = _validate_regular_expressions(config)
assert result == config


def test__validate_regular_expressions_invalid_regex():
"""Test _validate_regular_expressions with invalid regex syntax."""
config = {
"ms2rescore": {
"psm_id_pattern": r"scan:(\d+", # Missing closing parenthesis
"spectrum_id_pattern": None,
"psm_id_rt_pattern": None,
"psm_id_im_pattern": None,
}
}
with pytest.raises(MS2RescoreConfigurationError, match="Invalid regular expression"):
_validate_regular_expressions(config)


def test__validate_regular_expressions_no_capturing_group():
"""Test _validate_regular_expressions with no capturing groups."""
config = {
"ms2rescore": {
"psm_id_pattern": r"scan:\d+:.*", # No capturing group
"spectrum_id_pattern": None,
"psm_id_rt_pattern": None,
"psm_id_im_pattern": None,
}
}
with pytest.raises(
MS2RescoreConfigurationError, match="should contain exactly one capturing group"
):
_validate_regular_expressions(config)


def test__validate_regular_expressions_multiple_capturing_groups():
"""Test _validate_regular_expressions with multiple capturing groups."""
config = {
"ms2rescore": {
"psm_id_pattern": r"scan:(\d+):(.*)", # Two capturing groups
"spectrum_id_pattern": None,
"psm_id_rt_pattern": None,
"psm_id_im_pattern": None,
}
}
with pytest.raises(
MS2RescoreConfigurationError, match="should contain exactly one capturing group"
):
_validate_regular_expressions(config)


def test__validate_regular_expressions_spectrum_id_pattern_invalid():
"""Test _validate_regular_expressions with invalid spectrum_id_pattern."""
config = {
"ms2rescore": {
"psm_id_pattern": None,
"spectrum_id_pattern": r"spectrum_\d+", # No capturing group
"psm_id_rt_pattern": None,
"psm_id_im_pattern": None,
}
}
with pytest.raises(
MS2RescoreConfigurationError, match="should contain exactly one capturing group"
):
_validate_regular_expressions(config)
71 changes: 71 additions & 0 deletions tests/test_parse_psms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import json
from pathlib import Path

import pytest
from psm_utils import PSM, PSMList

from ms2rescore.exceptions import MS2RescoreConfigurationError
from ms2rescore.parse_psms import parse_psms


@pytest.fixture(scope="module")
def default_config():
cfg_path = Path(__file__).parents[1] / "ms2rescore" / "package_data" / "config_default.json"
cfg = json.loads(cfg_path.read_text())["ms2rescore"]
return cfg


@pytest.fixture
def psm_list_factory():
def _factory(ids):
return PSMList(
psm_list=[
PSM(
peptidoform="PEPTIDE/2",
run="run1",
spectrum_id=sid,
retention_time=None,
ion_mobility=None,
precursor_mz=None,
)
for sid in ids
]
)

return _factory


def test_psm_id_pattern_success(default_config, psm_list_factory):
psm_list = psm_list_factory(["scan:1:fileA", "scan:2:fileA"])
# Ensure at least one decoy is present so parse_psms does not raise
psm_list[0].is_decoy = True
config = dict(default_config)
config.update(
{
"psm_id_pattern": r"scan:(\d+):.*",
"lower_score_is_better": True,
"psm_file": [],
"psm_reader_kwargs": {},
"id_decoy_pattern": None,
}
)

result = parse_psms(config, psm_list)
assert list(result["spectrum_id"]) == ["1", "2"]


def test_psm_id_pattern_collapses_unique_ids(default_config, psm_list_factory):
psm_list = psm_list_factory(["scan:1:fileA", "scan:1:fileB"])
config = dict(default_config)
config.update(
{
"psm_id_pattern": r"scan:(\d+):.*",
"lower_score_is_better": True,
"psm_file": [],
"psm_reader_kwargs": {},
"id_decoy_pattern": None,
}
)

with pytest.raises(MS2RescoreConfigurationError):
parse_psms(config, psm_list)
Loading
Loading