import pandas as pd
import re
import os
from . import spectral_operations as so
import re
import shutil
import numpy as np
from tqdm import tqdm
from fuzzywuzzy import fuzz
import json
[docs]
def read_msp(file_path):
"""
Reads the MSP files into the pandas dataframe, and sort/remove zero intensity ions in MS/MS spectra.
Args:
file_path (str): target path path for the MSP file.
Returns:
pd.DataFrame: DataFrame containing the MS/MS spectra information
"""
spectra = []
spectrum = {}
if os.path.exists(file_path)== False:
raise FileNotFoundError(f"File not found: {file_path}")
return ()
with open(file_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue # Skip empty lines
# Handle metadata
if ":" in line:
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
if key == 'name':
# Save current spectrum and start a new one
if spectrum:
spectra.append(spectrum)
spectrum = {'name': value, 'peaks': []}
else:
spectrum[key] = value
# Handle peak data (assumed to start with a number)
elif line[0].isdigit():
peaks = line.split()
m_z = float(peaks[0])
intensity = float(peaks[1])
spectrum['peaks'].append((([m_z, intensity])))
# Save the last spectrum
if spectrum:
spectra.append(spectrum)
df = pd.DataFrame(spectra)
df['peaks'] = [so.sort_spectrum(so.remove_zero_ions(np.array(peak))) for peak in df['peaks']]
for column in df.columns:
if column != 'peaks': # Skip 'peaks' column
try:
df[column] = pd.to_numeric(df[column], errors='raise')
except:
pass
df = standardize_col(df)
return df
[docs]
def write_to_msp(df, file_path, msms_col = 'peaks', normalize = False):
"""
Pair function of read_msp.
Exports a pandas DataFrame to an MSP file.
Args:
df (pd.DataFrame): DataFrame containing spectrum information. Should have columns for 'name', 'peaks', and other metadata.
file_path (str): Destination path for the MSP file.
Returns:
None
"""
if normalize == True:
df[msms_col] = [so.normalize_spectrum(peak) for peak in df[msms_col]]
with open(file_path, 'w') as f:
for _, row in df.iterrows():
# Write the name of the spectrum
if isinstance(row[msms_col], float):
continue
if 'name' in df.columns:
f.write(f"Name: {row['name']}\n")
# Write other metadata if available
for col in df.columns:
if col not in ['name', msms_col] and 'peak' not in col:
f.write(f"{col.capitalize()}: {row[col]}\n")
# Write the peaks (assuming each peak is a tuple of (m/z, intensity))
f.write(f"Num Peaks: {len(row[msms_col])}\n")
for mz, intensity in row[msms_col]:
f.write(f"{mz} {intensity}\n")
# Separate spectra by an empty line
f.write("\n")
[docs]
def save_df(df, save_path):
"""
Pair function of save_df.
Save a DataFrame contaning MS/MS spectra to a CSV file, converting any columns containing 2D numpy arrays to string format.
Args:
df (pandas.DataFrame): The DataFrame to be saved.
save_path (str): The file path where the DataFrame should be saved. If the path does not end with '.csv', it will be appended automatically.
Returns:
None
Notes:
- This function identifies columns in the DataFrame that contain 2D numpy arrays with a second dimension of size 2.
- These identified columns are converted to string format before saving to the CSV file.
- The function uses tqdm to display a progress bar while processing the rows of the DataFrame.
"""
data = df.copy()
cols = []
for c in df.columns:
if isinstance(df.iloc[0][c], np.ndarray):
if np.shape(df.iloc[0][c])[1]==2:
cols.append(c)
print(cols)
if save_path.endswith('.csv') == False:
save_path = save_path+'.csv'
for col in cols:
specs = []
for index, row in tqdm(data.iterrows(), total = len(data)):
specs.append(so.arr_to_str(row[col]))
data[col]=specs
data.to_csv(save_path, index = False)
[docs]
def read_df(path, keep_ms1_only = False):
"""
Pair function of write_df.
Reads a CSV file into a DataFrame, processes specific columns based on a pattern check,
and MS/MS in string format to 2-D numpy array (string is used to avoid storage issue in csv files).
Args:
path (str): The file path to the CSV file.
Returns:
pandas.DataFrame: The processed DataFrame with specific columns converted.
Raises:
FileNotFoundError: If the file at the specified path does not exist.
pd.errors.EmptyDataError: If the CSV file is empty.
pd.errors.ParserError: If the CSV file contains parsing errors.
Notes:
- The function assumes that the first row of the CSV file contains the column headers.
- The `check_pattern` function is used to determine which columns to process.
- The `so.str_to_arr` function is used to convert the values in the selected columns.
"""
df = pd.read_csv(path)
print('done read in df...')
for col in df.columns:
if check_pattern(df[col].iloc[0]):
df[col] = [so.str_to_arr(y[col]) for x,y in df.iterrows()]
df = standardize_col(df)
if keep_ms1_only == False:
df.dropna(subset=['peaks'], inplace=True)
if ':' in df.iloc[0]['peaks']:
df['peaks']=[so.msdial_to_array(row['peaks']) for index, row in df.iterrows() if row['peaks'] == row['peaks']]
df.reset_index(drop=True, inplace=True)
return(df)
from .constant import standard_mapping
[docs]
def standardize_col(df):
"""
Standardizes column names in the given DataFrame based on a provided mapping. Help to read in and processing files with MS Dial generated msp files.
Args:
df (pd.DataFrame): The DataFrame whose column names need to be standardized.
standard_mapping (dict): A dictionary where keys are common variations of the name,
and values are the standard name.
Returns:
pd.DataFrame: DataFrame with standardized column names.
"""
# Create a mapping for case-insensitive column names
new_columns = []
for col in df.columns:
# Convert the column name to lowercase
col_lower = col.lower()
if col_lower != 'reference_precursor_mz':
col_lower = col_lower.replace('reference_', '')
# Map the column name to the standard one if found in the standard mapping
standardized_col = standard_mapping.get(col_lower)
if standardized_col is not None:
new_columns.append(standardized_col)
else:
new_columns.append(col_lower)
# Assign the new standardized columns back to the DataFrame
df.columns = new_columns
return df
[docs]
def check_pattern(input_string):
"""
Helper function for read_df.
Regular expression to match pairs of floats in standard or scientific notation separated by a tab, with each pair on a new line
Args:
input_string (str): input string to check for the pattern
Returns:
bool: True if the pattern is found, False otherwise
"""
if isinstance(input_string, str):
if '\t' in input_string:
return True
return False
[docs]
def export_denoising_searches(results, save_dir, top_n = 10):
"""
Pair function of import_denoising_searches.
Exports the results of a denoising search to a JSON file.
Args:
results (list): The list of results from a denoising search.
save_path (str): The file path where the results should be saved. If the path does not end with '.json', it will be appended automatically.
Returns:
None
"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
for i in range(len(results)):
if results[i].empty or len(results[i])==0:
continue
else:
temp = results[i].head(top_n)
pmz_temp = temp.iloc[0]['precursor_mz']
write_to_msp(temp, os.path.join(save_dir, f"denoising_search_{i}_{pmz_temp:0.4f}.msp"), msms_col='query_peaks_denoised')