import numpy as np
from rdkit.Chem.rdMolDescriptors import CalcMolFormula
from rdkit import Chem
import chemparse
import itertools
from tqdm import tqdm
import multiprocessing as mp
import re
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.*')
from .identifier_utils import is_mol, is_smiles, is_formula
from molmass import Formula
from . import spectral_operations as so
from .chem_utils import replace_adduct_string, calculate_precursormz
from .constant import proton_mass
# key functions: electronic denoising and formula denoising
_numpy_formula_format = np.int16
import warnings
warnings.filterwarnings("ignore")
[docs]
def spectral_denoising_batch(msms_query, smiles_query, adduct_query, mass_tolerance = 0.005):
"""
Perform batch spectral denoising on multiple sets of MS/MS spectra, SMILES strings, and adducts. Uses multiprocessing to parallelize the denoising process.
Parameters:
msms_query (list): A list of MS/MS spectra data.
smiles_query (list): A list of SMILES strings corresponding to the MS/MS spectra.
adduct_query (list): A list of adducts corresponding to the MS/MS spectra.
mass_tolerance (float, optional): The allowed deviation for the denoising process. Default is 0.005.
Returns:
list: A list of denoised MS/MS from the spectral denoising process.
Notes:
- The lengths of msms_query, smiles_query, and adduct_query must be the same. If not, the function will print an error message and return an empty tuple.
- The function uses multiprocessing to parallelize the denoising process, utilizing 6 processes.
"""
print('i am in new')
if len(msms_query) != len(smiles_query) or len(msms_query) != len(adduct_query):
print('The length of msms, smiles and adduct should be the same')
return ()
with mp.Pool(processes=6) as pool:
# Use starmap to handle multiple parameters
results = pool.starmap(spectral_denoising, tqdm([(msms, smiles, adduct, mass_tolerance) for msms, smiles, adduct
in zip(msms_query, smiles_query,adduct_query)], total=len(adduct_query)))
return results
[docs]
def spectral_denoising(msms, smiles, adduct, mass_tolerance = 0.005):
"""
Perform spectral denoising on the given mass spectrometry data. The function first performs electronic denoising, followed by formula denoising.
Parameters:
msms (numpy.array): The mass spectrometry data to be denoised.
smiles (str): The SMILES representation of the molecule.
adduct (str): The adduct type.
mass_tolerance (float, optional): The mass tolerance for the denoising process. Default is 0.005.
Returns:
numpy.array: The denoised mass spectrometry data.Returns NaN if the input is invalid or if the denoising process fails.
Notes:
- The function first checks if any of the inputs are of type np.nan, which is considered invalid.
- It then performs electronic denoising on the msms data.
- If electronic denoising resulted in empty spectrum (all ions removed), it will return np.nan.
- If successful, it proceeds to formula denoising using the electronic denoised data, smiles, adduct, and mass_tolerance.
"""
if isinstance(msms, float) or isinstance(smiles, float) or isinstance(adduct, float):
# print('the input is invalid')
return np.nan
electronic_denoised = electronic_denoising(msms)
if isinstance(electronic_denoised, float):
return(np.nan)
formula_denoised = formula_denoising(electronic_denoised, smiles, adduct, mass_tolerance)
return formula_denoised
[docs]
def electronic_denoising(msms):
"""
Perform electronic denoising on a given mass spectrometry (MS/MS) spectrum.
This function processes the input MS/MS spectrum by sorting the peaks based on their intensity,
and then iteratively selects and confirms peaks based on a specified intensity threshold.
The confirmed peaks are then packed and sorted before being returned.
Parameters:
msms (np.ndarray): The first item is always m/z and the second item is intensity.
Returns:
np.ndarray: The cleaned spectrum with electronic noises removed. If no ion presents, will return np.nan.
"""
if isinstance(msms, float):
return np.nan
mass, intensity = msms.T[0], msms.T[1]
order = np.argsort(intensity)
mass = mass[order]
intensity = intensity[order]
mass_confirmed = np.array([])
intensity_confirmed = np.array([])
while len(intensity)>0:
seed_intensity = np.max(intensity)
idx_left = np.searchsorted(intensity, seed_intensity*0.999, side= 'left')
mass_temp = mass[idx_left:]
intensity_temp = intensity[idx_left:]
if len(mass_temp)<=3:
mass_confirmed = np.concatenate((mass_confirmed, mass_temp))
intensity_confirmed = np.concatenate((intensity_confirmed,intensity_temp))
intensity = intensity[0:idx_left]
mass = mass[0:idx_left]
if len(mass_confirmed)==0:
return np.nan
return(so.sort_spectrum(so.pack_spectrum(mass_confirmed, intensity_confirmed)) )
from .chem_utils import determine_adduct_charge, determine_parent_coefs, parse_adduct
from .seven_golden_rules import *
[docs]
def get_denoise_tag(frag_msms, all_possible_candidate_formula, all_possible_mass, pmz,has_benzene, mass_threshold):
"""
Determine which ions in the fragment regions are chemically feasible ion.
This function calculates the mass loss for each fragment in the MS/MS data and
searches for candidate formulas within a specified mass threshold. If the
`has_benzene` flag is set, the precursor mass (`pmz`) is adjusted by adding the
mass of the N2O isotope to count for rare cases of forming N2/H2O adducts in the collision chamber.
The ions will be given a True only if it can be associated with at least 1 chemically feasible subformula of the molecular formula.
Args:
frag_msms (numpy.array): Array of fragment MS/MS data, where each tuple contains the mass and intensity of a fragment.
all_possible_candidate_formula (list): List of all possible candidate formulas.
all_possible_mass (numpy.ndarray): Sorted array of all possible masses.
pmz (float): Precursor mass.
has_benzene (bool): Flag indicating if benzene is present.
mass_threshold (float): Mass threshold for searching candidate formulas.
Returns:
list: List of denoise tags for each fragment.
"""
tag = []
if has_benzene:
pmz = pmz+Formula('N2O').isotope.mass
for f in frag_msms:
loss = pmz - f[0] # get loss
idx_left, idx_right = all_possible_mass.searchsorted([loss - mass_threshold, loss + mass_threshold+1E-9])
tag.append(check_candidates(all_possible_candidate_formula[idx_left:idx_right]))
return tag
[docs]
def check_candidates(candidates):
"""
Checks a list of candidates to see if any of them meet a certain ratio condition.
Args:
candidates (list): A list of candidate formulas to be checked.
Returns:
bool: True if at least one candidate meets the ratio condition, False otherwise.
"""
for c in candidates:
if check_ratio(c) and check_huristic(c):
# if check_ratio(c) and check_senior(c):
return True
return False
[docs]
def get_pmz_statistics(msms, c_pmz, mass_tolerance):
"""
Use the real precursor m/z to estimate the mass deviation in a given spectrum.
Parameters:
msms (numpy.ndarray): A 2D array where the first row contains m/z values and the second row contains intensity values.
c_pmz (float): The computed m/z value around which to search for the most intense peak.
mass_tolerance (float): The mass tolerance within which to search for the most intense peak.
Returns:
tuple: A tuple containing:
- r_pmz (float): The actual precursor m/z. If not found (precursor is fully fragmented), the computed m/z is returned.
- float: The deviation between computed and actual precursor m/z, scaled by 1.75 if it exceeds the initial mass tolerance.
"""
msms_T = msms.T
idx_left, idx_right = msms_T[0].searchsorted([c_pmz - 0.01, c_pmz + 0.01])
if idx_left == idx_right:
return c_pmz, mass_tolerance
pmz_idx = np.argmax(msms_T[1][idx_left:idx_right])
r_pmz = msms_T[0][idx_left:idx_right][pmz_idx]
r_deviation = np.abs(c_pmz - r_pmz)
return r_pmz, r_deviation*1.2
[docs]
def has_benzene(molecule):
"""
Check if the given molecule contains a benzene ring.
Args:
molecule (Union[Chem.Mol, str]): The molecule to check. It can be a RDKit molecule object
or a SMILES string.
Returns:
bool: True if the molecule contains a benzene ring, False otherwise.
"""
if is_mol(molecule) == False and is_smiles(molecule) == True:
molecule = Chem.MolFromSmiles(molecule)
elif is_formula(molecule) == True:
return True
benzene = Chem.MolFromSmiles('c1ccccc1') # Aromatic benzene SMILES notation
# Check if benzene is a substructure of the given molecule
return molecule.HasSubstructMatch(benzene)
# below are benchmarked functions
[docs]
def dnl_denoising(msms):
"""
Perform Dynamic noise level estimation denoising on given msms spectra.
Details about the algorithm can be found in the paper: A Dynamic Noise Level Algorithm for Spectral Screening of Peptide MS/MS Spectra.
Parameters:
msms (numpy.ndarray): A 2D numpy array with shape (2, n) where n is the number of data points. For each instance, first item is pmz and second item is intensity.
Returns:
numpy.ndarray: A 2D numpy array containing the denoised mass spectrometry data, sorted and packed. If the input data has only two points and does not meet the criteria, returns NaN.
Notes:
- The function assumes that the input data is a numpy array with two columns.
- The function uses a linear regression model to predict the signal region.
"""
mass, intensity = msms.T[0], msms.T[1]
order= np.argsort(intensity)
mass = mass[order]
intensity = intensity[order]
from sklearn.linear_model import LinearRegression
if intensity[1]/2>=intensity[0]*1.5:
signal_idx = 1
else:
k = 2
if len(mass)==2:
return(np.nan)
for k in range(2, len(mass)):
I = intensity[0:k]
i = np.arange(1,k+1)
model = LinearRegression().fit(i.reshape((-1,1)), I)
i_predicted = model.predict(np.array([k+1]).reshape(-1,1))
if intensity[k]/ i_predicted >2:
break
signal_idx = k
mass_signal = mass[signal_idx:]
intensity_signal = intensity[signal_idx:]
return(so.sort_spectrum(so.pack_spectrum(mass_signal, intensity_signal)))
[docs]
def ms_reduce(msms, reduce_factor = 90):
"""
Reimplementation of MS-Reduce algorithm.
Details about this algorithm can be found at: MS-REDUCE: an ultrafast technique for reduction of big mass spectrometry data for high-throughput processing
Parameters:
msms (numpy.ndarray): A 2D numpy array with shape (2, n) where n is the number of data points. For each instance, first item is pmz and second item is intensity.
reduce_factor (int, optional): The percentage by which to reduce the number of peaks. Default is 90.
Returns:
numpy.ndarray: The reduced MS/MS spectrum as a 2D numpy array, sorted and packed.
"""
mass, intensity = msms.T[0], msms.T[1]
n_chose_peak = np.int32(np.ceil(len(mass)*(1-reduce_factor/100)))
order = np.argsort(intensity)
mass = mass[order]
intensity = intensity[order]
mass_taken = np.array([])
intensity_taken = np.array([])
for i in range(0,11):
idx_left = np.searchsorted(intensity, np.max(intensity)*(11-i-1)/(11), side = 'left')
idx_right = np.searchsorted(intensity, np.max(intensity)*(11-i)/(11), side = 'right')
factor = (n_chose_peak-len(mass_taken))/(idx_right-idx_left)
if factor>1:
factor = 1
sampled_n = np.int32(np.floor(factor*(idx_right-idx_left)))
sampled_mass = np.random.choice(mass[idx_left:idx_right], size=sampled_n, replace=False)
sampled_intensity = np.random.choice(intensity[idx_left:idx_right], size=sampled_n, replace=False)
mass_taken = np.concatenate([mass_taken, sampled_mass])
intensity_taken= np.concatenate([intensity_taken, sampled_intensity])
if factor<1:
break
return so.sort_spectrum(so.pack_spectrum(mass_taken, intensity_taken))
[docs]
def threshold_denoising(msms, threshold = 1):
"""
The most widely used and simple denoising algorithm, which discard all peaks below a predefined threshold.
This function filters out peaks in the mass spectrometry spectrum whose
intensity is below a specified threshold percentage of the maximum intensity.
Parameters:
msms (numpy.ndarray): A 2D numpy array with shape (2, n) where n is the number of data points. For each instance, first item is pmz and second item is intensity.
threshold (float, optional): The threshold percentage (0-100) of the maximum intensity below which peaks will be removed. Default is 1.
Returns:
numpy.ndarray: denoised spectrum as a 2D numpy array, sorted and packed.
"""
mass, intensity = msms.T[0], msms.T[1]
intensity_percent = intensity/np.max(intensity)
to_keep = intensity_percent>(threshold/100)
mass = mass[to_keep]
intensity = intensity[to_keep]
return(so.pack_spectrum(mass, intensity))