Spaces:
Sleeping
Sleeping
| import pickle | |
| import re | |
| import time | |
| import random as rand | |
| from os.path import join | |
| from sklearn.model_selection import train_test_split | |
| from src.snapconfig import config | |
| def apply_filter(l_filt, file_name): | |
| try: | |
| file_parts = re.search(r"(\d+)-(\d+)-(\d+.\d+)-(\d+)-(\d+).[pt|npy]", file_name) | |
| l_charge = int(file_parts[4]) | |
| mods = int(file_parts[5]) | |
| except: | |
| print(file_name) | |
| print(file_parts) | |
| if ((l_filt["charge"] == 0 or l_charge <= l_filt["charge"]) # change this back to <= | |
| and (mods <= l_filt["mods"])): | |
| return True | |
| return True#False | |
| def load_file_names(l_filt, l_listing_path, count=None): | |
| 'Load the peptide and corresponding spectra file names that satisfy the filter' | |
| with open(l_listing_path, 'rb') as f: | |
| dir_listing = pickle.load(f) | |
| rand.shuffle(dir_listing) | |
| l_pep_file_names = [] | |
| l_spec_file_names_lists = [] | |
| for pep, spec_list in dir_listing[:count]: | |
| spec_file_list = [] | |
| for spec in spec_list: | |
| if apply_filter(l_filt, spec): | |
| spec_file_list.append(spec) | |
| if spec_file_list: | |
| l_pep_file_names.append(pep) | |
| l_spec_file_names_lists.append(spec_file_list) | |
| assert len(l_pep_file_names) == len(l_spec_file_names_lists) | |
| return l_pep_file_names, l_spec_file_names_lists | |
| if __name__ == '__main__': | |
| charge = config.get_config(section='input', key='charge') | |
| use_mods = config.get_config(section='input', key='use_mods') | |
| num_mods = config.get_config(section='input', key='num_mods') | |
| filt = {'charge': charge, 'mods': num_mods if use_mods else 0} | |
| test_size = config.get_config(section='ml', key='test_size') | |
| train_count = config.get_config(section="ml", key="train_count") | |
| batch_size = config.get_config(section="ml", key="batch_size") | |
| train_count = None if train_count == 0 else train_count | |
| in_tensor_dir = config.get_config(section='preprocess', key='in_tensor_dir') | |
| print(in_tensor_dir) | |
| listing_path = join(in_tensor_dir, 'pep_spec.pkl') | |
| pep_file_names, spec_file_names_lists = load_file_names(filt, listing_path, train_count) | |
| split_rand_state = int(time.time()) | |
| train_peps, test_peps, train_specs, test_specs = train_test_split( | |
| pep_file_names, spec_file_names_lists, test_size=test_size, | |
| random_state=split_rand_state, shuffle=True) | |
| # test_peps, val_peps, test_specs, val_specs = train_test_split( | |
| # test_peps, test_specs, test_size=.1, | |
| # random_state=split_rand_state, shuffle=True) | |
| # get the 100k version | |
| # train_peps = train_peps[:80000] | |
| # train_specs = train_specs[:80000] | |
| # test_peps = test_peps[:20000] | |
| # test_specs = test_specs[:20000] | |
| print("Writing train test split listings as pickles.") | |
| with open(join(in_tensor_dir, "train_peps.pkl"), "wb") as trp: | |
| pickle.dump(train_peps, trp) | |
| with open(join(in_tensor_dir, "train_specs.pkl"), "wb") as trs: | |
| pickle.dump(train_specs, trs) | |
| with open(join(in_tensor_dir, "test_peps.pkl"), "wb") as tep: | |
| pickle.dump(test_peps, tep) | |
| with open(join(in_tensor_dir, "test_specs.pkl"), "wb") as tes: | |
| pickle.dump(test_specs, tes) | |
| # with open(join(in_tensor_dir, "val_peps.pkl"), "wb") as vap: | |
| # pickle.dump(test_peps, vap) | |
| # with open(join(in_tensor_dir, "val_specs.pkl"), "wb") as vas: | |
| # pickle.dump(test_specs, vas) | |