import abc
import collections
import copy
import datetime
import enum
import functools
import json
import math
import os
import pickle
import re
import urllib.request
from dataclasses import dataclass, field
from typing import (
Any,
BinaryIO,
Dict,
List,
Optional,
Set,
Sequence,
Tuple,
Union,
)
from urllib.error import URLError
import appdirs
import fastobo
import lark
try:
import pyteomics.cmass as pmass
except ImportError:
import pyteomics.mass as pmass
# Set to None to disable caching.
cache_dir = appdirs.user_cache_dir("spectrum_utils", False)
# noinspection PyArgumentList
LookupType = enum.Enum("LookupType", "ACCESSION NAME MASS")
# noinspection PyArgumentList
LabelType = enum.Enum("LabelType", "XL BRANCH GENERAL")
[docs]@dataclass
class Ion:
ion: str
[docs]@dataclass
class Charge:
charge: int
ions: Optional[List[Ion]] = None
[docs]class ModificationSource(abc.ABC):
[docs] @abc.abstractmethod
def get_mass(self) -> float:
"""
Get the mass of the modification from its source.
Returns
-------
float
The modification mass.
"""
return math.nan
[docs]class CvEntry(ModificationSource):
controlled_vocabulary: str
accession: str
name: str
[docs] def __init__(
self,
controlled_vocabulary: Optional[str] = None,
accession: Optional[str] = None,
name: Optional[str] = None,
):
super().__init__()
self._controlled_vocabulary = controlled_vocabulary
self._accession = accession
self._name = name
self._mass = None
def __repr__(self):
return (
f"CvEntry(controlled_vocabulary={self.controlled_vocabulary}, "
f"accession={self.accession}, name={self.name})"
)
@property
def controlled_vocabulary(self) -> str:
if self._controlled_vocabulary is None:
for cv in ("UNIMOD", "MOD"):
try:
self._read_from_cv(cv, name=self._name)
self.controlled_vocabulary = cv
break
except KeyError:
pass
else:
raise KeyError(
f'Term "{self._name}" not found in UNIMOD or PSI-MOD'
)
return self._controlled_vocabulary
@controlled_vocabulary.setter
def controlled_vocabulary(self, controlled_vocabulary: str):
self._controlled_vocabulary = controlled_vocabulary
@property
def accession(self) -> str:
if self._accession is None:
self._read_from_cv(self.controlled_vocabulary, name=self._name)
return self._accession
@accession.setter
def accession(self, accession: str):
self._accession = accession
@property
def name(self) -> str:
if self._name is None:
self._read_from_cv(
self.controlled_vocabulary, accession=self._accession
)
return self._name
@name.setter
def name(self, name: str):
self._name = name
[docs] def get_mass(self) -> float:
"""
Get the mass of the modification, as defined by its CV term.
Returns
-------
float
The modification mass.
"""
if self._mass is None:
# Force resolving from the CV.
if self._accession is not None:
self._read_from_cv(
self.controlled_vocabulary, accession=self._accession
)
elif self._name is not None:
self._read_from_cv(self.controlled_vocabulary, name=self._name)
return self._mass
[docs] def _read_from_cv(
self,
cv: str,
accession: Optional[str] = None,
name: Optional[str] = None,
):
"""
Read the term in its controlled vocabulary entry.
Parameters
----------
cv : str
The controlled vocabulary identifier.
accession : Optional[str]
The accession to query the CV. Set as `None` to query by name,
name : Optional[str]
The name to query the CV. Set as `None` to query by accession,
Raises
------
KeyError
If the term was not found in its controlled vocabulary.
URLError
If the controlled vocabulary could not be retrieved from its online
resource.
ValueError
- If an unknown controlled vocabulary identifier is specified.
- If no mass was specified for a GNO term or its parent terms.
"""
cv_by_accession, cv_by_name = _import_cv(cv, cache_dir)
key, lookup_type = None, None
try:
if accession is not None:
key, lookup_type = self.accession, "accession"
self._mass, self.name = cv_by_accession[key]
elif name is not None:
key, lookup_type = self.name, "name"
self._mass, self.accession = cv_by_name[key]
else:
raise ValueError("No name or accession given")
except KeyError:
raise KeyError(
f"Term {key} not found in the {cv} controlled vocabulary by "
f"{lookup_type}"
)
[docs]@dataclass
class Mass(ModificationSource):
mass: float
controlled_vocabulary: Optional[str] = None
[docs] def get_mass(self) -> float:
"""
Get the mass of the modification.
Returns
-------
float
The modification mass.
"""
return self.mass
[docs]@dataclass
class Monosaccharide:
monosaccharide: str
count: int
[docs]@dataclass
class Glycan(ModificationSource):
composition: List[Monosaccharide]
_mass: float = field(default=None, init=False, repr=False)
[docs] def get_mass(self) -> float:
"""
Get the mass of the modification, computed from its monosaccharide
composition.
Returns
-------
float
The modification mass.
Raises
------
URLError
If the monosaccharide definitions could not be retrieved from its
online resource.
"""
if self._mass is None:
mono = _import_cv("mono", cache_dir)
self._mass = sum(
[mono[m.monosaccharide] * m.count for m in self.composition]
)
return self._mass
[docs]@dataclass
class Info:
message: str
[docs]@dataclass
class Label:
type: LabelType
label: str
score: Optional[float] = None
[docs]class Modification:
mass: Optional[float]
position: Union[int, Tuple[int, int], str]
source: List[ModificationSource] = None
label: Optional[Label] = None
_mass: field(default=None, init=False, repr=False)
[docs] def __init__(
self,
mass: Optional[float] = None,
position: Optional[Union[int, Tuple[int, int], str]] = None,
source: Optional[List[ModificationSource]] = None,
label: Optional[Label] = None,
):
self.position = position
self.source = source
self.label = label
self._mass = mass
def __repr__(self):
return (
f"Modification(mass={self._mass}, position={self.position}, "
f"source={self.source}, label={self.label})"
)
@functools.cached_property
def mass(self) -> Optional[float]:
if self._mass is None and self.source is not None:
for source in self.source:
if isinstance(source, ModificationSource):
source_mass = source.get_mass()
if source_mass is not None and not math.isnan(source_mass):
self._mass = source_mass
break
return self._mass
# noinspection PyMethodMayBeStatic, PyPep8Naming
[docs]def _modification_sort_key(mod: Modification):
"""
Key to sort modifications.
The sort order is:
1. Unknown modifications.
2. Labile modifications.
3. C-terminal modifications.
4. Modifications on specific amino acids and modification ranges based on
the start of the range.
5. N-terminal modifications.
"""
if isinstance(mod.position, int):
return mod.position
elif isinstance(mod.position, tuple):
return mod.position[0]
elif mod.position == "N-term":
return -1
elif mod.position == "C-term":
return math.inf
elif mod.position == "labile":
return -2
elif mod.position == "unknown":
return -3
elif mod.position == "global":
return -4
[docs]def parse(proforma: str) -> List[Proteoform]:
"""
Parse a ProForma-encoded string.
The string is parsed by building an abstract syntax tree to interpret the
ProForma language. Next, modifications are localized to residue positions
and translated into Python objects.
Parameters
----------
proforma : str
The ProForma string.
Returns
-------
List[Proteoform]
A list of proteoforms with their modifications (and additional)
information specified by the ProForma string.
Raises
------
URLError
If a controlled vocabulary could not be retrieved from its online
resource.
KeyError
If a term was not found in its controlled vocabulary.
NotImplementedError
Mass calculation using a molecular formula that contains isotopes (not
supported by Pyteomics mass calculation).
ValueError
If no mass was specified for a GNO term or its parent terms.
"""
dir_name = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(dir_name, "proforma.ebnf")) as f_in:
parser = lark.Lark(
f_in.read(),
start="proforma",
parser="earley",
lexer="dynamic_complete",
import_paths=[dir_name],
)
# noinspection PyUnresolvedReferences
try:
return ProFormaTransformer().transform(parser.parse(proforma))
except lark.visitors.VisitError as e:
raise e.orig_exc
[docs]@functools.lru_cache
def _import_cv(
cv_id: str, cache: Optional[str]
) -> Union[
Tuple[Dict[str, Tuple[float, str]], Dict[str, Tuple[float, str]]],
Dict[str, float],
]:
"""
Import a ProForma controlled vocabulary from its online resource.
Parameters
----------
cv_id : str
The controlled vocabulary identifier.
cache : Optional[str]
Directory used to cache downloaded controlled vocabularies, or None to
disable caching.
Returns
-------
Union[
Tuple[Dict[str, Tuple[float, str]], Dict[str, Tuple[float, str]]],
Dict[str, float],
]
- For modification controlled vocabularies:
A tuple with mappings (i) from term accession to modification mass
and term name, and (ii) from term name to modification mass and term
accession.
- For the monosaccharide controlled vocabulary:
A dictionary with as keys the monosaccharide strings and as values
the corresponding masses.
Raises
------
URLError
If the controlled vocabulary could not be retrieved from its online
resource.
ValueError
- If an unknown controlled vocabulary identifier is specified.
- If no mass was specified for a GNO term or its parent terms.
"""
if cv_id == "UNIMOD":
url = "http://www.unimod.org/obo/unimod.obo"
elif cv_id in ("MOD", "RESID"):
# RESID is not available as a separate entity anymore but is part of
# PSI-MOD.
url = (
"https://raw.githubusercontent.com/HUPO-PSI/psi-mod-CV/master/"
"PSI-MOD.obo"
)
elif cv_id == "XLMOD":
url = (
"https://raw.githubusercontent.com/HUPO-PSI/mzIdentML/master/"
"cv/XLMOD.obo"
)
elif cv_id == "GNO":
url = (
"https://github.com/glygen-glycan-data/GNOme/releases/latest/"
"download/GNOme.obo"
)
elif cv_id == "mono":
url = (
"https://raw.githubusercontent.com/HUPO-PSI/ProForma/master/"
"monosaccharides/mono.obo.json"
)
else:
raise ValueError(f"Unknown controlled vocabulary: {cv_id}")
# Try to read the CV from the cache first.
cv_cached = _load_from_cache(cache, f"{cv_id}.pkl")
if cv_cached is not None:
cv, date_cache = cv_cached
# Check that the cached CV is up to date.
match = re.fullmatch(
r"^https://(?:raw\.githubusercontent|github)\.com/"
r"([^/]+)/([^/]+)/(?:master|releases/latest/download)/(.+)$",
url,
)
if match is not None:
owner_name, repo_name, filename = match.group(1, 2, 3)
try:
with urllib.request.urlopen(
f"https://api.github.com/repos/{owner_name}/{repo_name}/"
f"commits?path={filename}&per_page=1&page=1"
) as response:
if response.getcode() < 400:
date_url = datetime.datetime.strptime(
json.load(response)[0]["commit"]["author"]["date"],
"%Y-%m-%dT%H:%M:%SZ",
)
if date_cache >= date_url:
return cv
except urllib.error.HTTPError:
pass
else:
# Just use the cached CV if we can't compare timestamps.
return cv
# If we're here it means that we should retrieve the CV from its URL.
with urllib.request.urlopen(url) as response:
if 400 <= response.getcode() < 600:
raise URLError(
f"Failed to retrieve the {cv_id} controlled vocabulary from "
f"its URL"
)
if cv_id in ("UNIMOD", "MOD", "RESID", "XLMOD", "GNO"):
cv = _parse_obo(response, cv_id)
elif cv_id == "mono":
mono_json = json.loads(
response.read().decode(
response.info().get_param("charset", "utf-8")
)
)
cv = {}
for term in mono_json["terms"].values():
mass = float(term["has_monoisotopic_mass"])
cv[term["name"]] = mass
for synonym in term.get("synonym", []):
cv[synonym] = mass
# Save to the cache if enabled.
_store_in_cache(cache, f"{cv_id}.pkl", (cv, datetime.datetime.utcnow()))
return cv
[docs]def _parse_obo(
obo_fh: BinaryIO, cv_id: str
) -> Tuple[Dict[str, Tuple[float, str]], Dict[str, Tuple[float, str]]]:
"""
Parse an OBO controlled vocabulary.
Supported OBO files are: UNIMOD, MOD (including RESID), XLMOD, GNO.
Parameters
----------
obo_fh : BinaryIO
The OBO file handle.
cv_id : str
The controlled vocabulary identifier.
Returns
-------
Tuple[Dict[str, Tuple[float, str]], Dict[str, Tuple[float, str]]]
A tuple with mappings (i) from term accession to modification mass and
term name, and (ii) from term name to modification mass and term
accession.
Raises
------
ValueError
If no mass was specified for a GNO term or its parent terms.
"""
cv_by_accession, cv_by_name, gno_graph = {}, {}, {}
for frame in fastobo.load(obo_fh):
term_accession, term_name, term_mass = str(frame.id), None, None
if isinstance(frame, fastobo.term.TermFrame):
for clause in frame:
if isinstance(clause, fastobo.term.NameClause):
term_name = clause.name.strip()
if cv_id == "GNO" and "molecular weight" in term_name:
term_mass = float(
term_name[
term_name.rindex("weight ")
+ 7 : term_name.rindex(" Da")
]
)
elif cv_id == "RESID" and isinstance(
clause, fastobo.term.DefClause
):
for xref in clause.xrefs:
if xref.id.prefix == "RESID":
term_accession = str(xref.id)
break
elif isinstance(clause, fastobo.term.XrefClause):
term_xref = clause.raw_value()
if (
cv_id == "UNIMOD" and "delta_mono_mass" in term_xref
) or (
cv_id in ("MOD", "RESID")
and "DiffMono" in term_xref
and not term_xref.endswith('"none"')
):
term_mass = float(
term_xref[
term_xref.index('"')
+ 1 : term_xref.rindex('"')
]
)
elif (
cv_id == "XLMOD"
and isinstance(clause, fastobo.term.PropertyValueClause)
and (
clause.property_value.relation.prefix
== "monoIsotopicMass"
)
):
term_mass = float(clause.property_value.value)
elif cv_id == "GNO" and isinstance(
clause, fastobo.term.IsAClause
):
gno_graph[term_accession] = term_name, str(clause.term)
if (
term_accession.startswith(f"{cv_id}:")
and term_mass is not None
):
cv_by_accession[term_accession] = term_mass, term_name
if term_name is not None:
cv_by_name[term_name] = term_mass, term_accession
if cv_id == "GNO":
for term_accession, (term_name, parent_accession) in gno_graph.items():
if term_accession in cv_by_accession:
continue
terms = [(term_accession, term_name)]
while (
parent_accession not in cv_by_accession
and parent_accession in gno_graph
):
parent_name, grandparent_accession = gno_graph[
parent_accession
]
terms.append((parent_accession, parent_name))
parent_accession = grandparent_accession
if parent_accession not in cv_by_accession:
raise ValueError(
f"No mass found for term {term_accession} in the GNO "
f"controlled vocabulary"
)
term_mass = cv_by_accession[parent_accession][0]
for add_accession, add_name in terms:
cv_by_accession[add_accession] = term_mass, add_name
cv_by_name[add_name] = term_mass, add_accession
return cv_by_accession, cv_by_name
[docs]def _store_in_cache(cache: Optional[str], filename: str, obj: Any) -> None:
"""
Store an object in the cache.
Parameters
----------
cache : Optional[str]
Directory where the cached objects are stored, or None to disable
caching.
filename : str
Filename of the stored cache object.
obj : Any
Object to store in the cache.
"""
if cache is not None:
os.makedirs(cache, exist_ok=True)
with open(os.path.join(cache, filename), "wb") as f_out:
pickle.dump(obj, f_out, pickle.HIGHEST_PROTOCOL)
[docs]def _load_from_cache(cache: Optional[str], filename: str) -> Optional[Any]:
"""
Load an object from the cache.
Parameters
----------
cache : Optional[str]
Directory where the cached objects are stored, or None to disable
caching.
filename : str
Filename of the stored cache object.
Returns
-------
Any
The object retrieved from the cache, or None if not found.
"""
if cache is not None:
cache_filename = os.path.join(cache, filename)
if os.path.isfile(cache_filename):
with open(cache_filename, "rb") as f_in:
return pickle.load(f_in)
return None
[docs]def clear_cache(resource_ids: Optional[Sequence[str]] = None) -> None:
"""
Clear the downloaded resources from the cache.
Parameters
----------
resource_ids : Optional[Sequence[str]]
Identifiers of the resources to remove from the cache. If None, all
known files will be removed.
"""
# Clear the in-memory cache.
_import_cv.cache_clear()
# Clear the on-disk cache.
if cache_dir is not None:
if resource_ids is None:
resource_ids = ("UNIMOD", "MOD", "RESID", "XLMOD", "GNO", "mono")
for cv_id in resource_ids:
filename = os.path.join(cache_dir, f"{cv_id}.pkl")
if os.path.isfile(filename):
os.remove(filename)