diff --git a/CHANGELOG.md b/CHANGELOG.md index 70b66fb..e7ec95d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Changed +- Methods that interact directly with the pyMBE dataframe are now private and stored in a dedicated module in `storage/df_management`. These methods also have been refactored to be stateless methods, i.e. making it impossible for them to change behavior during the pyMBE object lifetime or for the user to change the pyMBE dataframe unless explicitely calling them. This includes the methods: `add_bond_in_df`, `add_value_to_df`, `assign_molecule_id`, `check_if_df_cell_has_a_value`, `check_if_name_is_defined_in_df`, `check_if_multiple_pmb_types_for_name`, `clean_df_row`, `clean_ids_in_df_row`, `copy_df_entry`, `create_variable_with_units`, `convert_columns_to_original_format`, `convert_str_to_bond_object`, `delete_entries_in_df`, `find_bond_key`, `setup_df`. (#145) +- `define_particle_entry_in_df` is now a private method in pyMBE, as it is a convenience method for internal use. (#145) +- The custom `NumpyEncoder` is now a private class in the private module `storage/df_management` because it is only internally used in pyMBE for serialization/deserialization. (#145) + ## [1.0.0] - 2025-10-08 ### Changed diff --git a/pyMBE/pyMBE.py b/pyMBE/pyMBE.py index fa88d27..ab69f8c 100644 --- a/pyMBE/pyMBE.py +++ b/pyMBE/pyMBE.py @@ -25,6 +25,7 @@ import scipy.optimize import logging import importlib.resources +from pyMBE.storage.df_management import _DFManagement as _DFm class pymbe_library(): """ @@ -39,18 +40,6 @@ class pymbe_library(): Kw(`pint.Quantity`): Ionic product of water. Used in the setup of the G-RxMC method. """ - class NumpyEncoder(json.JSONEncoder): - """ - Custom JSON encoder that converts NumPy arrays to Python lists - and NumPy scalars to Python scalars. - """ - def default(self, obj): - if isinstance(obj, np.ndarray): - return obj.tolist() - if isinstance(obj, np.generic): - return obj.item() - return super().default(obj) - def __init__(self, seed, temperature=None, unit_length=None, unit_charge=None, Kw=None): """ Initializes the pymbe_library by setting up the reduced unit system with `temperature` and `reduced_length` @@ -79,39 +68,30 @@ def __init__(self, seed, temperature=None, unit_length=None, unit_charge=None, K unit_charge=unit_charge, temperature=temperature, Kw=Kw) - self.setup_df() + + self.df = _DFm._setup_df() self.lattice_builder = None - self.root = importlib.resources.files(__package__) - return - - def _check_if_name_is_defined_in_df(self, name): - """ - Checks if `name` is defined in `pmb.df`. + self.root = importlib.resources.files(__package__) - Args: - name(`str`): label to check if defined in `pmb.df`. - - Returns: - `bool`: `True` for success, `False` otherwise. - """ - return name in self.df['name'].unique() - - def _check_if_multiple_pmb_types_for_name(self, name, pmb_type_to_be_defined): + def _define_particle_entry_in_df(self,name): """ - Checks if `name` is defined in `pmb.df` with multiple pmb_types. + Defines a particle entry in pmb.df. Args: - name(`str`): label to check if defined in `pmb.df`. - pmb_type_to_be_defined(`str`): pmb object type corresponding to `name`. + name(`str`): Unique label that identifies this particle type. Returns: - `bool`: `True` for success, `False` otherwise. + index(`int`): Index of the particle in pmb.df """ - if name in self.df['name'].unique(): - current_object_type = self.df[self.df['name']==name].pmb_type.values[0] - if current_object_type != pmb_type_to_be_defined: - raise ValueError (f"The name {name} is already defined in the df with a pmb_type = {current_object_type}, pymMBE does not support objects with the same name but different pmb_types") + if _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): + index = self.df[self.df['name']==name].index[0] + else: + index = len(self.df) + self.df.at [index, 'name'] = name + self.df.at [index,'pmb_type'] = 'particle' + self.df.fillna(pd.NA, inplace=True) + return index def _check_supported_molecule(self, molecule_name,valid_pmb_types): """ @@ -148,61 +128,6 @@ def _check_if_name_has_right_type(self, name, expected_pmb_type, hard_check=True if hard_check: raise ValueError(f"The name {name} has been defined in the pyMBE DataFrame with a pmb_type = {pmb_type}. This function only supports pyMBE objects with pmb_type = {expected_pmb_type}") return False - - def _clean_ids_in_df_row(self, row): - """ - Cleans particle, residue and molecules ids in `row`. - If there are other repeated entries for the same name, drops the row. - - Args: - row(pd.DataFrame): A row from the DataFrame to clean. - """ - columns_to_clean = ['particle_id', - 'particle_id2', - 'residue_id', - 'molecule_id'] - if len(self.df.loc[self.df['name'] == row['name'].values[0]]) > 1: - self.df = self.df.drop(row.index).reset_index(drop=True) - else: - for column_name in columns_to_clean: - self.df.loc[row.index, column_name] = pd.NA - - def add_bond_in_df(self, particle_id1, particle_id2, use_default_bond=False): - """ - Adds a bond entry on the `pymbe.df` storing the particle_ids of the two bonded particles. - - Args: - particle_id1(`int`): particle_id of the type of the first particle type of the bonded particles - particle_id2(`int`): particle_id of the type of the second particle type of the bonded particles - use_default_bond(`bool`, optional): Controls if a bond of type `default` is used to bond particle whose bond types are not defined in `pmb.df`. Defaults to False. - - Returns: - index(`int`): Row index where the bond information has been added in pmb.df. - """ - particle_name1 = self.df.loc[(self.df['particle_id']==particle_id1) & (self.df['pmb_type']=="particle")].name.values[0] - particle_name2 = self.df.loc[(self.df['particle_id']==particle_id2) & (self.df['pmb_type']=="particle")].name.values[0] - - bond_key = self.find_bond_key(particle_name1=particle_name1, - particle_name2=particle_name2, - use_default_bond=use_default_bond) - if not bond_key: - return None - self.copy_df_entry(name=bond_key,column_name='particle_id2',number_of_copies=1) - indexs = np.where(self.df['name']==bond_key) - index_list = list (indexs[0]) - used_bond_df = self.df.loc[self.df['particle_id2'].notnull()] - #without this drop the program crashes when dropping duplicates because the 'bond' column is a dict - used_bond_df = used_bond_df.drop([('bond_object','')],axis =1 ) - used_bond_index = used_bond_df.index.to_list() - if not index_list: - return None - for index in index_list: - if index not in used_bond_index: - self.clean_df_row(index=int(index)) - self.df.at[index,'particle_id'] = particle_id1 - self.df.at[index,'particle_id2'] = particle_id2 - break - return index def add_bonds_to_espresso(self, espresso_system) : """ @@ -219,74 +144,7 @@ def add_bonds_to_espresso(self, espresso_system) : espresso_system.bonded_inter.add(bond) else: logging.warning('there are no bonds defined in pymbe.df') - return - - def add_value_to_df(self,index,key,new_value, non_standard_value=False, overwrite=False): - """ - Adds a value to a cell in the `pmb.df` DataFrame. - - Args: - index(`int`): index of the row to add the value to. - key(`str`): the column label to add the value to. - non_standard_value(`bool`, optional): Switch to enable insertion of non-standard values, such as `dict` objects. Defaults to False. - overwrite(`bool`, optional): Switch to enable overwriting of already existing values in pmb.df. Defaults to False. - """ - - token = "#protected:" - - def protect(obj): - if non_standard_value: - return token + json.dumps(obj, cls=self.NumpyEncoder) - return obj - - def deprotect(obj): - if non_standard_value and isinstance(obj, str) and obj.startswith(token): - return json.loads(obj.removeprefix(token)) - return obj - - # Make sure index is a scalar integer value - index = int(index) - assert isinstance(index, int), '`index` should be a scalar integer value.' - idx = pd.IndexSlice - if self.check_if_df_cell_has_a_value(index=index,key=key): - old_value = self.df.loc[index,idx[key]] - if not pd.Series([protect(old_value)]).equals(pd.Series([protect(new_value)])): - name=self.df.loc[index,('name','')] - pmb_type=self.df.loc[index,('pmb_type','')] - logging.debug(f"You are attempting to redefine the properties of {name} of pmb_type {pmb_type}") - if overwrite: - logging.info(f'Overwritting the value of the entry `{key}`: old_value = {old_value} new_value = {new_value}') - if not overwrite: - logging.debug(f"pyMBE has preserved of the entry `{key}`: old_value = {old_value}. If you want to overwrite it with new_value = {new_value}, activate the switch overwrite = True ") - return - - self.df.loc[index,idx[key]] = protect(new_value) - if non_standard_value: - self.df[key] = self.df[key].apply(deprotect) - return - - def assign_molecule_id(self, molecule_index): - """ - Assigns the `molecule_id` of the pmb object given by `pmb_type` - - Args: - molecule_index(`int`): index of the current `pmb_object_type` to assign the `molecule_id` - Returns: - molecule_id(`int`): Id of the molecule - """ - - self.clean_df_row(index=int(molecule_index)) - - if self.df['molecule_id'].isnull().values.all(): - molecule_id = 0 - else: - molecule_id = self.df['molecule_id'].max() +1 - - self.add_value_to_df (key=('molecule_id',''), - index=int(molecule_index), - new_value=molecule_id) - - return molecule_id + return def calculate_center_of_mass_of_molecule(self, molecule_id, espresso_system): """ @@ -328,9 +186,10 @@ def calculate_HH(self, molecule_name, pH_list=None, pka_set=None): - If no `pka_set` is given, the pKa values are taken from `pmb.df` - This function should only be used for single-phase systems. For two-phase systems `pmb.calculate_HH_Donnan` should be used. """ - self._check_if_name_is_defined_in_df(name=molecule_name) - self._check_supported_molecule(molecule_name=molecule_name, - valid_pmb_types=["molecule","peptide","protein"]) + _DFm._check_if_name_is_defined_in_df(name = molecule_name, + df = self.df) + self._check_supported_molecule(molecule_name = molecule_name, + valid_pmb_types = ["molecule","peptide","protein"]) if pH_list is None: pH_list=np.linspace(2,12,50) if pka_set is None: @@ -605,26 +464,7 @@ def check_dimensionality(self, variable, expected_dimensionality): correct_dimensionality=variable.check(f"{expected_dimensionality}") if not correct_dimensionality: raise ValueError(f"The variable {variable} should have a dimensionality of {expected_dimensionality}, instead the variable has a dimensionality of {variable.dimensionality}") - return correct_dimensionality - - def check_if_df_cell_has_a_value(self, index,key): - """ - Checks if a cell in the `pmb.df` at the specified index and column has a value. - - Args: - index(`int`): Index of the row to check. - key(`str`): Column label to check. - - Returns: - `bool`: `True` if the cell has a value, `False` otherwise. - """ - idx = pd.IndexSlice - import warnings - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - return not pd.isna(self.df.loc[index, idx[key]]) - - + return correct_dimensionality def check_if_metal_ion(self,key): """ @@ -655,108 +495,6 @@ def check_pka_set(self, pka_set): raise ValueError(f'missing a required key "{required_key}" in entry "{pka_name}" of pka_set ("{pka_entry}")') return - def clean_df_row(self, index, columns_keys_to_clean=("particle_id", "particle_id2", "residue_id", "molecule_id")): - """ - Cleans the columns of `pmb.df` in `columns_keys_to_clean` of the row with index `index` by assigning them a pd.NA value. - - Args: - index(`int`): Index of the row to clean. - columns_keys_to_clean(`list` of `str`, optional): List with the column keys to be cleaned. Defaults to [`particle_id`, `particle_id2`, `residue_id`, `molecule_id`]. - """ - for column_key in columns_keys_to_clean: - self.add_value_to_df(key=(column_key,''),index=index,new_value=pd.NA) - self.df.fillna(pd.NA, inplace=True) - return - - def convert_columns_to_original_format (self, df): - """ - Converts the columns of the Dataframe to the original format in pyMBE. - - Args: - df(`DataFrame`): dataframe with pyMBE information as a string - - """ - - columns_dtype_int = ['particle_id','particle_id2', 'residue_id','molecule_id', ('state_one','es_type'),('state_two','es_type'),('state_one','z'),('state_two','z') ] - - columns_with_units = ['sigma', 'epsilon', 'cutoff', 'offset'] - - columns_with_list_or_dict = ['residue_list','side_chains', 'parameters_of_the_potential','sequence', 'chain_map', 'node_map'] - - for column_name in columns_dtype_int: - df[column_name] = df[column_name].astype(pd.Int64Dtype()) - - for column_name in columns_with_list_or_dict: - if df[column_name].isnull().all(): - df[column_name] = df[column_name].astype(object) - else: - df[column_name] = df[column_name].apply(lambda x: json.loads(x) if pd.notnull(x) else x) - - for column_name in columns_with_units: - df[column_name] = df[column_name].apply(lambda x: self.create_variable_with_units(x) if pd.notnull(x) else x) - - df['bond_object'] = df['bond_object'].apply(lambda x: self.convert_str_to_bond_object(x) if pd.notnull(x) else x) - df["l0"] = df["l0"].astype(object) - df["pka"] = df["pka"].astype(object) - return df - - def convert_str_to_bond_object (self, bond_str): - """ - Convert a row read as a `str` to the corresponding ESPResSo bond object. - - Args: - bond_str(`str`): string with the information of a bond object. - - Returns: - bond_object(`obj`): ESPResSo bond object. - - Note: - Current supported bonds are: HarmonicBond and FeneBond - """ - import espressomd.interactions - - supported_bonds = ['HarmonicBond', 'FeneBond'] - m = re.search(r'^([A-Za-z0-9_]+)\((\{.+\})\)$', bond_str) - if m is None: - raise ValueError(f'Cannot parse bond "{bond_str}"') - bond = m.group(1) - if bond not in supported_bonds: - raise NotImplementedError(f"Bond type '{bond}' currently not implemented in pyMBE, accepted types are {supported_bonds}") - params = json.loads(m.group(2)) - bond_id = params.pop("bond_id") - bond_object = getattr(espressomd.interactions, bond)(**params) - bond_object._bond_id = bond_id - return bond_object - - def copy_df_entry(self, name, column_name, number_of_copies): - ''' - Creates 'number_of_copies' of a given 'name' in `pymbe.df`. - - Args: - name(`str`): Label of the particle/residue/molecule type to be created. `name` must be defined in `pmb.df` - column_name(`str`): Column name to use as a filter. - number_of_copies(`int`): number of copies of `name` to be created. - - Note: - - Currently, column_name only supports "particle_id", "particle_id2", "residue_id" and "molecule_id" - ''' - - valid_column_names=["particle_id", "residue_id", "molecule_id", "particle_id2" ] - if column_name not in valid_column_names: - raise ValueError(f"{column_name} is not a valid column_name, currently only the following are supported: {valid_column_names}") - df_by_name = self.df.loc[self.df.name == name] - if number_of_copies != 1: - df_by_name_repeated = pd.concat ([df_by_name]*(number_of_copies-1), ignore_index=True) - # Concatenate the new particle rows to `df` - self.df = pd.concat ([self.df,df_by_name_repeated], ignore_index=True) - else: - if not df_by_name[column_name].isnull().values.any(): - df_by_name = df_by_name[df_by_name.index == df_by_name.index.min()] - df_by_name_repeated = pd.concat ([df_by_name]*(number_of_copies), ignore_index=True) - df_by_name_repeated[column_name] = pd.NA - self.df = pd.concat ([self.df,df_by_name_repeated], ignore_index=True) - return - def create_added_salt(self, espresso_system, cation_name, anion_name, c_salt): """ Creates a `c_salt` concentration of `cation_name` and `anion_name` ions into the `espresso_system`. @@ -771,7 +509,7 @@ def create_added_salt(self, espresso_system, cation_name, anion_name, c_salt): c_salt_calculated(`float`): Calculated salt concentration added to `espresso_system`. """ for name in [cation_name, anion_name]: - if not self._check_if_name_is_defined_in_df(name=name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f"Object with name '{name}' is not defined in the DataFrame, no ions will be created.") return self._check_if_name_has_right_type(name=cation_name, @@ -881,7 +619,7 @@ def create_counterions(self, object_name, cation_name, anion_name, espresso_syst This function currently does not support the creation of counterions for hydrogels. """ for name in [object_name, cation_name, anion_name]: - if not self._check_if_name_is_defined_in_df(name=name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f"Object with name '{name}' is not defined in the DataFrame, no counterions will be created.") return for name in [cation_name, anion_name]: @@ -933,7 +671,7 @@ def create_hydrogel(self, name, espresso_system): Returns: hydrogel_info(`dict`): {"name":hydrogel_name, "chains": {chain_id1: {residue_id1: {'central_bead_id': central_bead_id, 'side_chain_ids': [particle_id1,...]},...,"node_start":node_start,"node_end":node_end}, chain_id2: {...},...}, "nodes":{node1:[node1_id],...}} """ - if not self._check_if_name_is_defined_in_df(name=name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f"Hydrogel with name '{name}' is not defined in the DataFrame, no hydrogel will be created.") return self._check_if_name_has_right_type(name=name, @@ -1040,28 +778,34 @@ def create_hydrogel_chain(self, node_start, node_end, node_positions, espresso_s espresso_system.part.by_id(node1[0]).add_bond((bond_node1_first_monomer, chain_ids[0])) espresso_system.part.by_id(node2[0]).add_bond((bond_node2_last_monomer, chain_ids[-1])) # Add bonds to data frame - bond_index1 = self.add_bond_in_df(particle_id1=node1[0], - particle_id2=chain_ids[0], - use_default_bond=False) - self.add_value_to_df(key=('molecule_id',''), - index=int(bond_index1), - new_value=molecule_id, - overwrite=True) - self.add_value_to_df(key=('residue_id',''), - index=int (bond_index1), - new_value=residue_ids[0], - overwrite=True) - bond_index2 = self.add_bond_in_df(particle_id1=node2[0], - particle_id2=chain_ids[-1], - use_default_bond=False) - self.add_value_to_df(key=('molecule_id',''), - index=int(bond_index2), - new_value=molecule_id, - overwrite=True) - self.add_value_to_df(key=('residue_id',''), - index=int (bond_index2), - new_value=residue_ids[-1], - overwrite=True) + self.df, bond_index1 = _DFm._add_bond_in_df(df = self.df, + particle_id1 = node1[0], + particle_id2 = chain_ids[0], + use_default_bond = False) + _DFm._add_value_to_df(df = self.df, + key = ('molecule_id',''), + index = int(bond_index1), + new_value = molecule_id, + overwrite = True) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(bond_index1), + new_value = residue_ids[0], + overwrite = True) + self.df, bond_index2 = _DFm._add_bond_in_df(df = self.df, + particle_id1 = node2[0], + particle_id2 = chain_ids[-1], + use_default_bond = False) + _DFm._add_value_to_df(df = self.df, + key = ('molecule_id',''), + index = int(bond_index2), + new_value = molecule_id, + overwrite = True) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(bond_index2), + new_value = residue_ids[-1], + overwrite = True) return chain_molecule_info def create_hydrogel_node(self, node_index, node_name, espresso_system): @@ -1106,7 +850,7 @@ def create_molecule(self, name, number_of_molecules, espresso_system, list_of_fi Note: Despite its name, this function can be used to create both molecules and peptides. """ - if not self._check_if_name_is_defined_in_df(name=name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f"Molecule with name '{name}' is not defined in the pyMBE DataFrame, no molecule will be created.") return {} if number_of_molecules <= 0: @@ -1133,21 +877,20 @@ def create_molecule(self, name, number_of_molecules, espresso_system, list_of_fi on_surface=True)[0] else: backbone_vector = np.array(backbone_vector) - - - first_residue = True molecules_info = {} residue_list = self.df[self.df['name']==name].residue_list.values [0] - self.copy_df_entry(name=name, - column_name='molecule_id', - number_of_copies=number_of_molecules) + self.df = _DFm._copy_df_entry(df = self.df, + name = name, + column_name = 'molecule_id', + number_of_copies = number_of_molecules) molecules_index = np.where(self.df['name']==name) molecule_index_list =list(molecules_index[0])[-number_of_molecules:] pos_index = 0 for molecule_index in molecule_index_list: - molecule_id = self.assign_molecule_id(molecule_index=molecule_index) + molecule_id = _DFm._assign_molecule_id(df = self.df, + molecule_index = molecule_index) molecules_info[molecule_id] = {} for residue in residue_list: if first_residue: @@ -1165,10 +908,11 @@ def create_molecule(self, name, number_of_molecules, espresso_system, list_of_fi residue_id = next(iter(residues_info)) # Add the correct molecule_id to all particles in the residue for index in self.df[self.df['residue_id']==residue_id].index: - self.add_value_to_df(key=('molecule_id',''), - index=int (index), - new_value=molecule_id, - overwrite=True) + _DFm._add_value_to_df(df = self.df, + key = ('molecule_id',''), + index = int (index), + new_value = molecule_id, + overwrite = True) central_bead_id = residues_info[residue_id]['central_bead_id'] previous_residue = residue residue_position = espresso_system.part.by_id(central_bead_id).pos @@ -1194,19 +938,22 @@ def create_molecule(self, name, number_of_molecules, espresso_system, list_of_fi backbone_vector=backbone_vector) residue_id = next(iter(residues_info)) for index in self.df[self.df['residue_id']==residue_id].index: - self.add_value_to_df(key=('molecule_id',''), - index=int (index), - new_value=molecule_id, - overwrite=True) + _DFm._add_value_to_df(df = self.df, + key = ('molecule_id',''), + index = int(index), + new_value = molecule_id, + overwrite = True) central_bead_id = residues_info[residue_id]['central_bead_id'] espresso_system.part.by_id(central_bead_id).add_bond((bond, previous_residue_id)) - bond_index = self.add_bond_in_df(particle_id1=central_bead_id, - particle_id2=previous_residue_id, - use_default_bond=use_default_bond) - self.add_value_to_df(key=('molecule_id',''), - index=int (bond_index), - new_value=molecule_id, - overwrite=True) + self.df, bond_index = _DFm._add_bond_in_df(df = self.df, + particle_id1 = central_bead_id, + particle_id2 = previous_residue_id, + use_default_bond = use_default_bond) + _DFm._add_value_to_df(df = self.df, + key = ('molecule_id',''), + index = int(bond_index), + new_value = molecule_id, + overwrite = True) previous_residue_id = central_bead_id previous_residue = residue molecules_info[molecule_id][residue_id] = residues_info[residue_id] @@ -1230,27 +977,29 @@ def create_particle(self, name, espresso_system, number_of_particles, position=N """ if number_of_particles <=0: return [] - if not self._check_if_name_is_defined_in_df(name=name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f"Particle with name '{name}' is not defined in the pyMBE DataFrame, no particle will be created.") return [] self._check_if_name_has_right_type(name=name, expected_pmb_type="particle") # Copy the data of the particle `number_of_particles` times in the `df` - self.copy_df_entry(name=name, - column_name='particle_id', - number_of_copies=number_of_particles) - # Get information from the particle type `name` from the df - z = self.df.loc[self.df['name']==name].state_one.z.values[0] + self.df = _DFm._copy_df_entry(df = self.df, + name = name, + column_name = 'particle_id', + number_of_copies = number_of_particles) + # Get information from the particle type `name` from the df + z = self.df.loc[self.df['name'] == name].state_one.z.values[0] z = 0. if z is None else z - es_type = self.df.loc[self.df['name']==name].state_one.es_type.values[0] + es_type = self.df.loc[self.df['name'] == name].state_one.es_type.values[0] # Get a list of the index in `df` corresponding to the new particles to be created - index = np.where(self.df['name']==name) - index_list =list(index[0])[-number_of_particles:] + index = np.where(self.df['name'] == name) + index_list = list(index[0])[-number_of_particles:] # Create the new particles into `espresso_system` created_pid_list=[] - for index in range (number_of_particles): - df_index=int (index_list[index]) - self.clean_df_row(index=df_index) + for index in range(number_of_particles): + df_index = int(index_list[index]) + _DFm._clean_df_row(df = self.df, + index = df_index) if position is None: particle_position = self.rng.random((1, 3))[0] *np.copy(espresso_system.box_l) else: @@ -1264,7 +1013,10 @@ def create_particle(self, name, espresso_system, number_of_particles, position=N if fix: kwargs["fix"] = 3 * [fix] espresso_system.part.add(**kwargs) - self.add_value_to_df(key=('particle_id',''),index=df_index,new_value=bead_id) + _DFm._add_value_to_df(df = self.df, + key = ('particle_id',''), + index = df_index, + new_value = bead_id) return created_pid_list def create_protein(self, name, number_of_proteins, espresso_system, topology_dict): @@ -1280,53 +1032,47 @@ def create_protein(self, name, number_of_proteins, espresso_system, topology_dic if number_of_proteins <=0: return - if not self._check_if_name_is_defined_in_df(name=name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f"Protein with name '{name}' is not defined in the pyMBE DataFrame, no protein will be created.") return self._check_if_name_has_right_type(name=name, expected_pmb_type="protein") - self.copy_df_entry(name=name, - column_name='molecule_id', - number_of_copies=number_of_proteins) - protein_index = np.where(self.df['name']==name) - protein_index_list =list(protein_index[0])[-number_of_proteins:] - - box_half=espresso_system.box_l[0]/2.0 + self.df = _DFm._copy_df_entry(df = self.df, + name = name, + column_name = 'molecule_id', + number_of_copies = number_of_proteins) + protein_index = np.where(self.df['name'] == name) + protein_index_list = list(protein_index[0])[-number_of_proteins:] + box_half = espresso_system.box_l[0] / 2.0 for molecule_index in protein_index_list: - - molecule_id = self.assign_molecule_id(molecule_index=molecule_index) - + molecule_id = _DFm._assign_molecule_id(df = self.df, + molecule_index = molecule_index) protein_center = self.generate_coordinates_outside_sphere(radius = 1, - max_dist=box_half, - n_samples=1, - center=[box_half]*3)[0] - + max_dist = box_half, + n_samples = 1, + center = [box_half]*3)[0] for residue in topology_dict.keys(): - residue_name = re.split(r'\d+', residue)[0] residue_number = re.split(r'(\d+)', residue)[1] residue_position = topology_dict[residue]['initial_pos'] position = residue_position + protein_center - particle_id = self.create_particle(name=residue_name, espresso_system=espresso_system, number_of_particles=1, position=[position], fix = True) - index = self.df[self.df['particle_id']==particle_id[0]].index.values[0] - self.add_value_to_df(key=('residue_id',''), - index=int (index), - new_value=int(residue_number), - overwrite=True) - - self.add_value_to_df(key=('molecule_id',''), - index=int (index), - new_value=molecule_id, - overwrite=True) - - return + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(index), + new_value = int(residue_number), + overwrite = True) + _DFm._add_value_to_df(df = self.df, + key = ('molecule_id',''), + index = int(index), + new_value = molecule_id, + overwrite = True) def create_residue(self, name, espresso_system, central_bead_position=None,use_default_bond=False, backbone_vector=None): """ @@ -1342,16 +1088,17 @@ def create_residue(self, name, espresso_system, central_bead_position=None,use_d Returns: residues_info(`dict`): {residue_id:{"central_bead_id":central_bead_id, "side_chain_ids":[particle_id1, ...]}} """ - if not self._check_if_name_is_defined_in_df(name=name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f"Residue with name '{name}' is not defined in the pyMBE DataFrame, no residue will be created.") return self._check_if_name_has_right_type(name=name, expected_pmb_type="residue") # Copy the data of a residue in the `df - self.copy_df_entry(name=name, - column_name='residue_id', - number_of_copies=1) + self.df = _DFm._copy_df_entry(df = self.df, + name = name, + column_name = 'residue_id', + number_of_copies = 1) residues_index = np.where(self.df['name']==name) residue_index_list =list(residues_index[0])[-1:] # search for defined particle and residue names @@ -1366,13 +1113,17 @@ def create_residue(self, name, espresso_system, central_bead_position=None,use_d # Dict structure {residue_id:{"central_bead_id":central_bead_id, "side_chain_ids":[particle_id1, ...]}} residues_info={} for residue_index in residue_index_list: - self.clean_df_row(index=int(residue_index)) + _DFm._clean_df_row(df = self.df, + index = int(residue_index)) # Assign a residue_id if self.df['residue_id'].isnull().all(): residue_id=0 else: residue_id = self.df['residue_id'].max() + 1 - self.add_value_to_df(key=('residue_id',''),index=int (residue_index),new_value=residue_id) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(residue_index), + new_value = residue_id) # create the principal bead central_bead_name = self.df.loc[self.df['name']==name].central_bead.values[0] central_bead_id = self.create_particle(name=central_bead_name, @@ -1415,19 +1166,22 @@ def create_residue(self, name, espresso_system, central_bead_position=None,use_d position=[bead_position], number_of_particles=1)[0] index = self.df[self.df['particle_id']==side_bead_id].index.values[0] - self.add_value_to_df(key=('residue_id',''), - index=int (index), - new_value=residue_id, - overwrite=True) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(index), + new_value = residue_id, + overwrite = True) side_chain_beads_ids.append(side_bead_id) espresso_system.part.by_id(central_bead_id).add_bond((bond, side_bead_id)) - index = self.add_bond_in_df(particle_id1=central_bead_id, - particle_id2=side_bead_id, - use_default_bond=use_default_bond) - self.add_value_to_df(key=('residue_id',''), - index=int (index), - new_value=residue_id, - overwrite=True) + self.df, index = _DFm._add_bond_in_df(df = self.df, + particle_id1 = central_bead_id, + particle_id2 = side_bead_id, + use_default_bond = use_default_bond) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(index), + new_value = residue_id, + overwrite = True) elif pmb_type == 'residue': central_bead_side_chain = self.df[self.df['name']==side_chain_element].central_bead.values[0] @@ -1457,53 +1211,42 @@ def create_residue(self, name, espresso_system, central_bead_position=None,use_d residue_id_side_chain=list(lateral_residue_info.keys())[0] # Change the residue_id of the residue in the side chain to the one of the bigger residue index = self.df[(self.df['residue_id']==residue_id_side_chain) & (self.df['pmb_type']=='residue') ].index.values[0] - self.add_value_to_df(key=('residue_id',''), - index=int(index), - new_value=residue_id, - overwrite=True) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(index), + new_value = residue_id, + overwrite = True) # Change the residue_id of the particles in the residue in the side chain side_chain_beads_ids+=[central_bead_side_chain_id]+lateral_beads_side_chain_ids for particle_id in side_chain_beads_ids: index = self.df[(self.df['particle_id']==particle_id) & (self.df['pmb_type']=='particle')].index.values[0] - self.add_value_to_df(key=('residue_id',''), - index=int (index), - new_value=residue_id, - overwrite=True) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int (index), + new_value = residue_id, + overwrite = True) espresso_system.part.by_id(central_bead_id).add_bond((bond, central_bead_side_chain_id)) - index = self.add_bond_in_df(particle_id1=central_bead_id, - particle_id2=central_bead_side_chain_id, - use_default_bond=use_default_bond) - self.add_value_to_df(key=('residue_id',''), - index=int (index), - new_value=residue_id, - overwrite=True) + self.df, index = _DFm._add_bond_in_df(df = self.df, + particle_id1 = central_bead_id, + particle_id2 = central_bead_side_chain_id, + use_default_bond = use_default_bond) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(index), + new_value = residue_id, + overwrite = True) # Change the residue_id of the bonds in the residues in the side chain to the one of the bigger residue for index in self.df[(self.df['residue_id']==residue_id_side_chain) & (self.df['pmb_type']=='bond') ].index: - self.add_value_to_df(key=('residue_id',''), - index=int(index), - new_value=residue_id, - overwrite=True) + _DFm._add_value_to_df(df = self.df, + key = ('residue_id',''), + index = int(index), + new_value = residue_id, + overwrite = True) # Internal bookkeeping of the side chain beads ids residues_info[residue_id]['side_chain_ids']=side_chain_beads_ids return residues_info - def create_variable_with_units(self, variable): - """ - Returns a pint object with the value and units defined in `variable`. - - Args: - variable(`dict` or `str`): {'value': value, 'units': units} - Returns: - variable_with_units(`obj`): variable with units using the pyMBE UnitRegistry. - """ - if isinstance(variable, dict): - value=variable.pop('value') - units=variable.pop('units') - elif isinstance(variable, str): - value = float(re.split(r'\s+', variable)[0]) - units = re.split(r'\s+', variable)[1] - variable_with_units=value*self.units(units) - return variable_with_units + def define_AA_residues(self, sequence, model): """ @@ -1576,19 +1319,25 @@ def define_bond(self, bond_type, bond_parameters, particle_pairs): offset = lj_parameters["offset"],) index = len(self.df) for label in [f'{particle_name1}-{particle_name2}', f'{particle_name2}-{particle_name1}']: - self._check_if_multiple_pmb_types_for_name(name=label, pmb_type_to_be_defined="bond") + _DFm._check_if_multiple_pmb_types_for_name(name=label, + pmb_type_to_be_defined="bond", + df=self.df) name=f'{particle_name1}-{particle_name2}' - self._check_if_multiple_pmb_types_for_name(name=name, pmb_type_to_be_defined="bond") + _DFm._check_if_multiple_pmb_types_for_name(name=name, + pmb_type_to_be_defined="bond", + df=self.df) self.df.at [index,'name']= name self.df.at [index,'bond_object'] = bond_object self.df.at [index,'l0'] = l0 - self.add_value_to_df(index=index, - key=('pmb_type',''), - new_value='bond') - self.add_value_to_df(index=index, - key=('parameters_of_the_potential',''), - new_value=bond_object.get_params(), - non_standard_value=True) + _DFm._add_value_to_df(df = self.df, + index = index, + key = ('pmb_type',''), + new_value = 'bond') + _DFm._add_value_to_df(df = self.df, + index = index, + key = ('parameters_of_the_potential',''), + new_value = bond_object.get_params(), + non_standard_value = True) self.df.fillna(pd.NA, inplace=True) return @@ -1627,20 +1376,23 @@ def define_default_bond(self, bond_type, bond_parameters, epsilon=None, sigma=No cutoff = cutoff, offset = offset) - self._check_if_multiple_pmb_types_for_name(name='default', - pmb_type_to_be_defined='bond') - + _DFm._check_if_multiple_pmb_types_for_name(name='default', + pmb_type_to_be_defined='bond', + df=self.df) + index = max(self.df.index, default=-1) + 1 self.df.at [index,'name'] = 'default' self.df.at [index,'bond_object'] = bond_object self.df.at [index,'l0'] = l0 - self.add_value_to_df(index = index, - key = ('pmb_type',''), - new_value = 'bond') - self.add_value_to_df(index = index, - key = ('parameters_of_the_potential',''), - new_value = bond_object.get_params(), - non_standard_value=True) + _DFm._add_value_to_df(df = self.df, + index = index, + key = ('pmb_type',''), + new_value = 'bond') + _DFm._add_value_to_df(df = self.df, + index = index, + key = ('parameters_of_the_potential',''), + new_value = bond_object.get_params(), + non_standard_value=True) self.df.fillna(pd.NA, inplace=True) return @@ -1667,21 +1419,23 @@ def define_hydrogel(self, name, node_map, chain_map): if self.lattice_builder.lattice.connectivity != chain_map_connectivity: raise ValueError("Incomplete hydrogel: A diamond lattice must contain correct 16 lattice index pairs") - self._check_if_multiple_pmb_types_for_name(name=name, - pmb_type_to_be_defined='hydrogel') - + _DFm._check_if_multiple_pmb_types_for_name(name=name, + pmb_type_to_be_defined='hydrogel', + df=self.df) index = len(self.df) self.df.at [index, "name"] = name self.df.at [index, "pmb_type"] = "hydrogel" - self.add_value_to_df(index = index, - key = ('node_map',''), - new_value = node_map, - non_standard_value=True) - self.add_value_to_df(index = index, - key = ('chain_map',''), - new_value = chain_map, - non_standard_value=True) + _DFm._add_value_to_df(df = self.df, + index = index, + key = ('node_map',''), + new_value = node_map, + non_standard_value = True) + _DFm._add_value_to_df(df = self.df, + index = index, + key = ('chain_map',''), + new_value = chain_map, + non_standard_value = True) for chain_label in chain_map: node_start = chain_label["node_start"] node_end = chain_label["node_end"] @@ -1699,35 +1453,16 @@ def define_molecule(self, name, residue_list): name(`str`): Unique label that identifies the `molecule`. residue_list(`list` of `str`): List of the `name`s of the `residue`s in the sequence of the `molecule`. """ - self._check_if_multiple_pmb_types_for_name(name=name, - pmb_type_to_be_defined='molecule') - + _DFm._check_if_multiple_pmb_types_for_name(name=name, + pmb_type_to_be_defined='molecule', + df=self.df) + index = len(self.df) self.df.at [index,'name'] = name self.df.at [index,'pmb_type'] = 'molecule' self.df.at [index,('residue_list','')] = residue_list self.df.fillna(pd.NA, inplace=True) - return - - def define_particle_entry_in_df(self,name): - """ - Defines a particle entry in pmb.df. - - Args: - name(`str`): Unique label that identifies this particle type. - - Returns: - index(`int`): Index of the particle in pmb.df - """ - - if self._check_if_name_is_defined_in_df(name=name): - index = self.df[self.df['name']==name].index[0] - else: - index = len(self.df) - self.df.at [index, 'name'] = name - self.df.at [index,'pmb_type'] = 'particle' - self.df.fillna(pd.NA, inplace=True) - return index + return def define_particle(self, name, z=0, acidity=pd.NA, pka=pd.NA, sigma=pd.NA, epsilon=pd.NA, cutoff=pd.NA, offset=pd.NA,overwrite=False): """ @@ -1752,10 +1487,11 @@ def define_particle(self, name, z=0, acidity=pd.NA, pka=pd.NA, sigma=pd.NA, epsi - The default setup corresponds to the Weeks−Chandler−Andersen (WCA) model, corresponding to purely steric interactions. - For more information on `sigma`, `epsilon`, `cutoff` and `offset` check `pmb.setup_lj_interactions()`. """ - index=self.define_particle_entry_in_df(name=name) - self._check_if_multiple_pmb_types_for_name(name=name, - pmb_type_to_be_defined='particle') - + index=self._define_particle_entry_in_df(name=name) + _DFm._check_if_multiple_pmb_types_for_name(name=name, + pmb_type_to_be_defined='particle', + df=self.df) + # If `cutoff` and `offset` are not defined, default them to the following values if pd.isna(cutoff): cutoff=self.units.Quantity(2**(1./6.), "reduced_length") @@ -1772,10 +1508,11 @@ def define_particle(self, name, z=0, acidity=pd.NA, pka=pd.NA, sigma=pd.NA, epsi if not pd.isna(parameters_with_dimensionality[parameter_key]["value"]): self.check_dimensionality(variable=parameters_with_dimensionality[parameter_key]["value"], expected_dimensionality=parameters_with_dimensionality[parameter_key]["dimensionality"]) - self.add_value_to_df(key=(parameter_key,''), - index=index, - new_value=parameters_with_dimensionality[parameter_key]["value"], - overwrite=overwrite) + _DFm._add_value_to_df(df = self.df, + key = (parameter_key,''), + index = index, + new_value = parameters_with_dimensionality[parameter_key]["value"], + overwrite = overwrite) # Define particle acid/base properties self.set_particle_acidity(name=name, @@ -1813,8 +1550,9 @@ def define_peptide(self, name, sequence, model): sequence (`string`): Sequence of the `peptide`. model (`string`): Model name. Currently only models with 1 bead '1beadAA' or with 2 beads '2beadAA' per amino acid are supported. """ - self._check_if_multiple_pmb_types_for_name(name = name, - pmb_type_to_be_defined='peptide') + _DFm._check_if_multiple_pmb_types_for_name(name = name, + pmb_type_to_be_defined='peptide', + df=self.df) valid_keys = ['1beadAA','2beadAA'] if model not in valid_keys: @@ -1846,8 +1584,9 @@ def define_protein(self, name,model, topology_dict, lj_setup_mode="wca", overwri """ # Sanity checks - self._check_if_multiple_pmb_types_for_name(name = name, - pmb_type_to_be_defined='protein') + _DFm._check_if_multiple_pmb_types_for_name(name = name, + pmb_type_to_be_defined='protein', + df=self.df) valid_model_keys = ['1beadAA','2beadAA'] valid_lj_setups = ["wca"] if model not in valid_model_keys: @@ -1899,27 +1638,17 @@ def define_residue(self, name, central_bead, side_chains): central_bead(`str`): `name` of the `particle` to be placed as central_bead of the `residue`. side_chains(`list` of `str`): List of `name`s of the pmb_objects to be placed as side_chains of the `residue`. Currently, only pmb_objects of type `particle`s or `residue`s are supported. """ - self._check_if_multiple_pmb_types_for_name(name=name, - pmb_type_to_be_defined='residue') - + _DFm._check_if_multiple_pmb_types_for_name(name=name, + pmb_type_to_be_defined='residue', + df=self.df) + index = len(self.df) self.df.at [index, 'name'] = name self.df.at [index,'pmb_type'] = 'residue' self.df.at [index,'central_bead'] = central_bead self.df.at [index,('side_chains','')] = side_chains self.df.fillna(pd.NA, inplace=True) - return - - def delete_entries_in_df(self, entry_name): - """ - Deletes entries with name `entry_name` from the DataFrame if it exists. - - Args: - entry_name (`str`): The name of the entry in the dataframe to delete. - - """ - if entry_name in self.df["name"].values: - self.df = self.df[self.df["name"] != entry_name].reset_index(drop=True) + return def delete_molecule_in_system(self, molecule_id, espresso_system): """ @@ -1937,7 +1666,8 @@ def delete_molecule_in_system(self, molecule_id, espresso_system): if molecule_row.empty: raise ValueError(f"No molecule found with molecule_id={molecule_id} in the DataFrame.") # Clean molecule from pmb.df - self._clean_ids_in_df_row(row=molecule_row) + self.df = _DFm._clean_ids_in_df_row(df = self.df, + row = molecule_row) # Delete particles and residues in the molecule residue_mask = (self.df['molecule_id'] == molecule_id) & (self.df['pmb_type'] == "residue") residue_rows = self.df.loc[residue_mask] @@ -1952,8 +1682,9 @@ def delete_molecule_in_system(self, molecule_id, espresso_system): for _ in range(number_of_bonds): bond_mask = (self.df['molecule_id'] == molecule_id) & (self.df['pmb_type'] == "bond") bond_rows = self.df.loc[bond_mask] - row=bond_rows.loc[[bond_rows.index[0]]] - self._clean_ids_in_df_row(row=row) + row = bond_rows.loc[[bond_rows.index[0]]] + self.df = _DFm._clean_ids_in_df_row(df = self.df, + row = row) def delete_particle_in_system(self, particle_id, espresso_system): """ @@ -1971,7 +1702,8 @@ def delete_particle_in_system(self, particle_id, espresso_system): if particle_row.empty: raise ValueError(f"No particle found with particle_id={particle_id} in the DataFrame.") espresso_system.part.by_id(particle_id).remove() - self._clean_ids_in_df_row(particle_row) + self.df = _DFm._clean_ids_in_df_row(df = self.df, + row = particle_row) def delete_residue_in_system(self, residue_id, espresso_system): """ @@ -1990,7 +1722,8 @@ def delete_residue_in_system(self, residue_id, espresso_system): residue_map=self.get_particle_id_map(object_name=residue_row["name"].values[0])["residue_map"] particle_ids = residue_map[residue_id] # Clean residue from pmb.df - self._clean_ids_in_df_row(row=residue_row) + self.df = _DFm._clean_ids_in_df_row(df = self.df, + row = residue_row) # Delete particles in the residue for particle_id in particle_ids: self.delete_particle_in_system(particle_id=particle_id, @@ -2001,9 +1734,9 @@ def delete_residue_in_system(self, residue_id, espresso_system): for _ in range(number_of_bonds): bond_mask = (self.df['residue_id'] == residue_id) & (self.df['pmb_type'] == "bond") bond_rows = self.df.loc[bond_mask] - row=bond_rows.loc[[bond_rows.index[0]]] - self._clean_ids_in_df_row(row=row) - + row = bond_rows.loc[[bond_rows.index[0]]] + self.df = _DFm._clean_ids_in_df_row(df = self.df, + row = row) def determine_reservoir_concentrations(self, pH_res, c_salt_res, activity_coefficient_monovalent_pair, max_number_sc_runs=200): """ @@ -2123,36 +1856,7 @@ def filter_df(self, pmb_type): """ pmb_type_df = self.df.loc[self.df['pmb_type']== pmb_type] pmb_type_df = pmb_type_df.dropna( axis=1, thresh=1) - return pmb_type_df - - def find_bond_key(self, particle_name1, particle_name2, use_default_bond=False): - """ - Searches for the `name` of the bond between `particle_name1` and `particle_name2` in `pymbe.df` and returns it. - - Args: - particle_name1(`str`): label of the type of the first particle type of the bonded particles. - particle_name2(`str`): label of the type of the second particle type of the bonded particles. - use_default_bond(`bool`, optional): If it is activated, the "default" bond is returned if no bond is found between `particle_name1` and `particle_name2`. Defaults to 'False'. - - Returns: - bond_key (str): `name` of the bond between `particle_name1` and `particle_name2` if a matching bond exists - - Note: - - If `use_default_bond`=`True`, it returns "default" if no key is found. - """ - bond_keys = [f'{particle_name1}-{particle_name2}', f'{particle_name2}-{particle_name1}'] - bond_defined=False - for bond_key in bond_keys: - if bond_key in self.df["name"].values: - bond_defined=True - correct_key=bond_key - break - if bond_defined: - return correct_key - elif use_default_bond: - return 'default' - else: - return + return pmb_type_df def find_value_from_es_type(self, es_type, column_name): """ @@ -2285,11 +1989,12 @@ def get_bond_length(self, particle_name1, particle_name2, hard_check=False, use_ - If `use_default_bond`=True and no bond is defined between `particle_name1` and `particle_name2`, it returns the default bond defined in `pmb.df`. - If `hard_check`=`True` stops the code when no bond is found. """ - bond_key = self.find_bond_key(particle_name1=particle_name1, - particle_name2=particle_name2, - use_default_bond=use_default_bond) + bond_key = _DFm._find_bond_key(df = self.df, + particle_name1 = particle_name1, + particle_name2 = particle_name2, + use_default_bond = use_default_bond) if bond_key: - return self.df[self.df['name']==bond_key].l0.values[0] + return self.df[self.df['name'] == bond_key].l0.values[0] else: msg = f"Bond not defined between particles {particle_name1} and {particle_name2}" if hard_check: @@ -2493,7 +2198,6 @@ def initialize_lattice_builder(self, diamond_lattice): logging.info(f"LatticeBuilder initialized with mpc={diamond_lattice.mpc} and box_l={diamond_lattice.box_l}") return self.lattice_builder - def load_interaction_parameters(self, filename, overwrite=False): """ Loads the interaction parameters stored in `filename` into `pmb.df` @@ -2517,7 +2221,8 @@ def load_interaction_parameters(self, filename, overwrite=False): for not_required_key in without_units+with_units: if not_required_key in param_dict.keys(): if not_required_key in with_units: - not_required_attributes[not_required_key]=self.create_variable_with_units(variable=param_dict.pop(not_required_key)) + not_required_attributes[not_required_key] = _DFm._create_variable_with_units(variable=param_dict.pop(not_required_key), + units_registry=self.units) elif not_required_key in without_units: not_required_attributes[not_required_key]=param_dict.pop(not_required_key) else: @@ -2540,16 +2245,21 @@ def load_interaction_parameters(self, filename, overwrite=False): bond_parameters = param_dict.pop('bond_parameters') bond_type = param_dict.pop('bond_type') if bond_type == 'harmonic': - k = self.create_variable_with_units(variable=bond_parameters.pop('k')) - r_0 = self.create_variable_with_units(variable=bond_parameters.pop('r_0')) + k = _DFm._create_variable_with_units(variable=bond_parameters.pop('k'), + units_registry=self.units) + r_0 = _DFm._create_variable_with_units(variable=bond_parameters.pop('r_0'), + units_registry=self.units) bond = {'r_0' : r_0, 'k' : k, } elif bond_type == 'FENE': - k = self.create_variable_with_units(variable=bond_parameters.pop('k')) - r_0 = self.create_variable_with_units(variable=bond_parameters.pop('r_0')) - d_r_max = self.create_variable_with_units(variable=bond_parameters.pop('d_r_max')) + k = _DFm._create_variable_with_units(variable=bond_parameters.pop('k'), + units_registry=self.units) + r_0 = _DFm._create_variable_with_units(variable=bond_parameters.pop('r_0'), + units_registry=self.units) + d_r_max = _DFm._create_variable_with_units(variable=bond_parameters.pop('d_r_max'), + units_registry=self.units) bond = {'r_0' : r_0, 'k' : k, 'd_r_max': d_r_max, @@ -2702,17 +2412,19 @@ def read_pmb_df (self,filename): Note: This function only accepts files with CSV format. """ - if filename.rsplit(".", 1)[1] != "csv": raise ValueError("Only files with CSV format are supported") df = pd.read_csv (filename,header=[0, 1], index_col=0) - columns_names = self.setup_df() - + self.df = _DFm._setup_df() + columns_names = pd.MultiIndex.from_frame(self.df) + columns_names = columns_names.names multi_index = pd.MultiIndex.from_tuples(columns_names) df.columns = multi_index - - self.df = self.convert_columns_to_original_format(df) - self.df.fillna(pd.NA, inplace=True) + _DFm._convert_columns_to_original_format(df=df, + units_registry=self.units) + self.df = df + self.df.fillna(pd.NA, + inplace=True) return self.df def read_protein_vtf_in_df (self,filename,unit_length=None): @@ -2811,12 +2523,13 @@ def search_bond(self, particle_name1, particle_name2, hard_check=False, use_defa - If `use_default_bond`=True and no bond is defined between `particle_name1` and `particle_name2`, it returns the default bond defined in `pmb.df`. - If `hard_check`=`True` stops the code when no bond is found. """ - - bond_key = self.find_bond_key(particle_name1=particle_name1, - particle_name2=particle_name2, - use_default_bond=use_default_bond) + + bond_key = _DFm._find_bond_key(df = self.df, + particle_name1 = particle_name1, + particle_name2 = particle_name2, + use_default_bond = use_default_bond) if use_default_bond: - if not self._check_if_name_is_defined_in_df(name="default"): + if not _DFm._check_if_name_is_defined_in_df(name="default", df=self.df): raise ValueError(f"use_default_bond is set to {use_default_bond} but no default bond has been defined. Please define a default bond with pmb.define_default_bond") if bond_key: return self.df[self.df['name']==bond_key].bond_object.values[0] @@ -2842,7 +2555,7 @@ def search_particles_in_residue(self, residue_name): - The function will return an empty list if the residue is not defined in `pmb.df`. - The function will return an empty list if the particles are not defined in the pyMBE DataFrame. ''' - if not self._check_if_name_is_defined_in_df(name=residue_name): + if not _DFm._check_if_name_is_defined_in_df(name=residue_name, df=self.df): logging.warning(f"Residue {residue_name} not defined in pmb.df") return [] self._check_if_name_has_right_type(name=residue_name, expected_pmb_type="residue") @@ -2851,12 +2564,12 @@ def search_particles_in_residue(self, residue_name): list_of_side_chains = self.df.at[index_residue, ('side_chains', '')] list_of_particles_in_residue = [] if central_bead is not pd.NA: - if self._check_if_name_is_defined_in_df(name=central_bead): + if _DFm._check_if_name_is_defined_in_df(name=central_bead, df=self.df): if self._check_if_name_has_right_type(name=central_bead, expected_pmb_type="particle", hard_check=False): list_of_particles_in_residue.append(central_bead) if list_of_side_chains is not pd.NA: for side_chain in list_of_side_chains: - if self._check_if_name_is_defined_in_df(name=side_chain): + if _DFm._check_if_name_is_defined_in_df(name=side_chain, df=self.df): object_type = self.df[self.df['name']==side_chain].pmb_type.values[0] else: continue @@ -2895,64 +2608,78 @@ def set_particle_acidity(self, name, acidity=pd.NA, default_charge_number=0, pka acidity = pd.NA logging.warning("the keyword 'inert' for acidity has been replaced by setting acidity = pd.NA. For backwards compatibility, acidity has been set to pd.NA. Support for `acidity = 'inert'` may be deprecated in future releases of pyMBE") - self.define_particle_entry_in_df(name=name) + self._define_particle_entry_in_df(name=name) for index in self.df[self.df['name']==name].index: if pka is not pd.NA: - self.add_value_to_df(key=('pka',''), - index=index, - new_value=pka, - overwrite=overwrite) - - self.add_value_to_df(key=('acidity',''), - index=index, - new_value=acidity, - overwrite=overwrite) - if not self.check_if_df_cell_has_a_value(index=index,key=('state_one','es_type')): - self.add_value_to_df(key=('state_one','es_type'), - index=index, - new_value=self.propose_unused_type(), - overwrite=overwrite) + _DFm._add_value_to_df(df = self.df, + key = ('pka',''), + index = index, + new_value = pka, + overwrite = overwrite) + + _DFm._add_value_to_df(df = self.df, + key = ('acidity',''), + index = index, + new_value = acidity, + overwrite = overwrite) + if not _DFm._check_if_df_cell_has_a_value(df=self.df, index=index,key=('state_one','es_type')): + _DFm._add_value_to_df(df = self.df, + key = ('state_one','es_type'), + index = index, + new_value = self.propose_unused_type(), + overwrite = overwrite) if pd.isna(self.df.loc [self.df['name'] == name].acidity.iloc[0]): - self.add_value_to_df(key=('state_one','z'), - index=index, - new_value=default_charge_number, - overwrite=overwrite) - self.add_value_to_df(key=('state_one','label'), - index=index, - new_value=name, - overwrite=overwrite) + _DFm._add_value_to_df(df = self.df, + key = ('state_one','z'), + index = index, + new_value = default_charge_number, + overwrite = overwrite) + _DFm._add_value_to_df(df = self.df, + key = ('state_one','label'), + index = index, + new_value = name, + overwrite = overwrite) else: protonated_label = f'{name}H' - self.add_value_to_df(key=('state_one','label'), - index=index, - new_value=protonated_label, - overwrite=overwrite) - self.add_value_to_df(key=('state_two','label'), - index=index, - new_value=name, - overwrite=overwrite) - if not self.check_if_df_cell_has_a_value(index=index,key=('state_two','es_type')): - self.add_value_to_df(key=('state_two','es_type'), - index=index, - new_value=self.propose_unused_type(), - overwrite=overwrite) - if self.df.loc [self.df['name'] == name].acidity.iloc[0] == 'acidic': - self.add_value_to_df(key=('state_one','z'), - index=index,new_value=0, - overwrite=overwrite) - self.add_value_to_df(key=('state_two','z'), - index=index, - new_value=-1, - overwrite=overwrite) + _DFm._add_value_to_df(df = self.df, + key = ('state_one','label'), + index = index, + new_value = protonated_label, + overwrite = overwrite) + _DFm._add_value_to_df(df = self.df, + key = ('state_two','label'), + index = index, + new_value = name, + overwrite = overwrite) + if not _DFm._check_if_df_cell_has_a_value(df=self.df, index=index,key=('state_two','es_type')): + _DFm._add_value_to_df(df = self.df, + key = ('state_two','es_type'), + index = index, + new_value = self.propose_unused_type(), + overwrite = overwrite) + if self.df.loc [self.df['name'] == name].acidity.iloc[0] == 'acidic': + _DFm._add_value_to_df(df = self.df, + key = ('state_one','z'), + index = index, + new_value = 0, + overwrite = overwrite) + _DFm._add_value_to_df(df = self.df, + key = ('state_two','z'), + index = index, + new_value = -1, + overwrite = overwrite) elif self.df.loc [self.df['name'] == name].acidity.iloc[0] == 'basic': - self.add_value_to_df(key=('state_one','z'), - index=index,new_value=+1, - overwrite=overwrite) - self.add_value_to_df(key=('state_two','z'), - index=index, - new_value=0, - overwrite=overwrite) + _DFm._add_value_to_df(df = self.df, + key = ('state_one','z'), + index = index, + new_value = +1, + overwrite = overwrite) + _DFm._add_value_to_df(df = self.df, + key = ('state_two','z'), + index = index, + new_value = 0, + overwrite = overwrite) self.df.fillna(pd.NA, inplace=True) return @@ -3029,7 +2756,7 @@ def setup_cpH (self, counter_ion, constant_pH, exclusion_range=None, pka_set=Non sucessfull_reactions_labels=[] charge_number_map = self.get_charge_number_map() for name in pka_set.keys(): - if not self._check_if_name_is_defined_in_df(name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f'The acid-base reaction of {name} has not been set up because its particle type is not defined in the pyMBE DataFrame.') continue gamma=10**-pka_set[name]['pka_value'] @@ -3257,7 +2984,7 @@ def setup_grxmc_reactions(self, pH_res, c_salt_res, proton_name, hydroxide_name, sucessful_reactions_labels=[] charge_number_map = self.get_charge_number_map() for name in pka_set.keys(): - if not self._check_if_name_is_defined_in_df(name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f'The acid-base reaction of {name} has not been set up because its particle type is not defined in the dataframe.') continue @@ -3385,7 +3112,7 @@ def setup_grxmc_unified(self, pH_res, c_salt_res, cation_name, anion_name, activ sucessful_reactions_labels=[] charge_number_map = self.get_charge_number_map() for name in pka_set.keys(): - if not self._check_if_name_is_defined_in_df(name): + if not _DFm._check_if_name_is_defined_in_df(name=name, df=self.df): logging.warning(f'The acid-base reaction of {name} has not been set up because its particle type is not defined in the dataframe.') continue @@ -3418,79 +3145,6 @@ def setup_grxmc_unified(self, pH_res, c_salt_res, cation_name, anion_name, activ sucessful_reactions_labels.append(name) return RE, sucessful_reactions_labels, ionic_strength_res - def setup_df (self): - """ - Sets up the pyMBE's dataframe `pymbe.df`. - - Returns: - columns_names(`obj`): pandas multiindex object with the column names of the pyMBE's dataframe - """ - - columns_dtypes = { - 'name': { - '': str}, - 'pmb_type': { - '': str}, - 'particle_id': { - '': pd.Int64Dtype()}, - 'particle_id2': { - '': pd.Int64Dtype()}, - 'residue_id': { - '': pd.Int64Dtype()}, - 'molecule_id': { - '': pd.Int64Dtype()}, - 'acidity': { - '': str}, - 'pka': { - '': object}, - 'central_bead': { - '': object}, - 'side_chains': { - '': object}, - 'residue_list': { - '': object}, - 'model': { - '': str}, - 'sigma': { - '': object}, - 'cutoff': { - '': object}, - 'offset': { - '': object}, - 'epsilon': { - '': object}, - 'state_one': { - 'label': str, - 'es_type': pd.Int64Dtype(), - 'z': pd.Int64Dtype()}, - 'state_two': { - 'label': str, - 'es_type': pd.Int64Dtype(), - 'z': pd.Int64Dtype()}, - 'sequence': { - '': object}, - 'bond_object': { - '': object}, - 'parameters_of_the_potential':{ - '': object}, - 'l0': { - '': float}, - 'node_map':{ - '':object}, - 'chain_map':{ - '':object}} - - self.df = pd.DataFrame(columns=pd.MultiIndex.from_tuples([(col_main, col_sub) for col_main, sub_cols in columns_dtypes.items() for col_sub in sub_cols.keys()])) - - for level1, sub_dtypes in columns_dtypes.items(): - for level2, dtype in sub_dtypes.items(): - self.df[level1, level2] = self.df[level1, level2].astype(dtype) - - columns_names = pd.MultiIndex.from_frame(self.df) - columns_names = columns_names.names - - return columns_names - def setup_lj_interactions(self, espresso_system, shift_potential=True, combining_rule='Lorentz-Berthelot'): """ Sets up the Lennard-Jones (LJ) potential between all pairs of particle types with values for `sigma`, `offset`, and `epsilon` stored in `pymbe.df`. @@ -3550,14 +3204,16 @@ def setup_lj_interactions(self, espresso_system, shift_potential=True, combining self.df.at [index, 'name'] = f'LJ: {label1}-{label2}' lj_params=espresso_system.non_bonded_inter[type_pair[0], type_pair[1]].lennard_jones.get_params() - self.add_value_to_df(index=index, - key=('pmb_type',''), - new_value='LennardJones') - - self.add_value_to_df(index=index, - key=('parameters_of_the_potential',''), - new_value=lj_params, - non_standard_value=True) + _DFm._add_value_to_df(df = self.df, + index = index, + key = ('pmb_type',''), + new_value = 'LennardJones') + + _DFm._add_value_to_df(df = self.df, + index = index, + key = ('parameters_of_the_potential',''), + new_value = lj_params, + non_standard_value = True) if non_parametrized_labels: logging.warning(f'The following particles do not have a defined value of sigma or epsilon in pmb.df: {non_parametrized_labels}. No LJ interaction has been added in ESPResSo for those particles.') return diff --git a/pyMBE/storage/df_management.py b/pyMBE/storage/df_management.py new file mode 100644 index 0000000..f01b950 --- /dev/null +++ b/pyMBE/storage/df_management.py @@ -0,0 +1,483 @@ +# +# Copyright (C) 2023-2025 pyMBE-dev team +# +# This file is part of pyMBE. +# +# pyMBE is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# pyMBE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import pandas as pd +import json +import re +import numpy as np +import logging +import warnings + +class _DFManagement: + + class _NumpyEncoder(json.JSONEncoder): + """ + Custom JSON encoder that converts NumPy arrays to Python lists + and NumPy scalars to Python scalars. + """ + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + if isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + + @classmethod + def _add_bond_in_df(cls, df, particle_id1, particle_id2, use_default_bond=False): + """ + Adds a bond entry on the `pymbe.df` storing the particle_ids of the two bonded particles. + + Args: + df(`DataFrame`): dataframe with pyMBE information. + particle_id1(`int`): particle_id of the type of the first particle type of the bonded particles + particle_id2(`int`): particle_id of the type of the second particle type of the bonded particles + use_default_bond(`bool`, optional): Controls if a bond of type `default` is used to bond particle whose bond types are not defined in `pmb.df`. Defaults to False. + + Returns: + df(`DataFrame`): dataframe with pyMBE information with the new bond added. + index(`int`): Row index where the bond information has been added in pmb.df. + """ + particle_name1 = df.loc[(df['particle_id']==particle_id1) & (df['pmb_type']=="particle")].name.values[0] + particle_name2 = df.loc[(df['particle_id']==particle_id2) & (df['pmb_type']=="particle")].name.values[0] + + bond_key = cls._find_bond_key(df = df, + particle_name1 = particle_name1, + particle_name2 = particle_name2, + use_default_bond = use_default_bond) + if not bond_key: + return None + df = cls._copy_df_entry(df = df, + name = bond_key, + column_name = 'particle_id2', + number_of_copies = 1) + indexs = np.where(df['name'] == bond_key) + index_list = list(indexs[0]) + used_bond_df = df.loc[df['particle_id2'].notnull()] + #without this drop the program crashes when dropping duplicates because the 'bond' column is a dict + used_bond_df = used_bond_df.drop([('bond_object','')],axis =1 ) + used_bond_index = used_bond_df.index.to_list() + if not index_list: + return None + for index in index_list: + if index not in used_bond_index: + cls._clean_df_row(df = df, + index = int(index)) + df.at[index,'particle_id'] = particle_id1 + df.at[index,'particle_id2'] = particle_id2 + break + return df, index + + @classmethod + def _add_value_to_df(cls, df, index,key,new_value, non_standard_value=False, overwrite=False): + """ + Adds a value to a cell in the `pmb.df` DataFrame. + + Args: + df(`DataFrame`): dataframe with pyMBE information. + index(`int`): index of the row to add the value to. + key(`str`): the column label to add the value to. + non_standard_value(`bool`, optional): Switch to enable insertion of non-standard values, such as `dict` objects. Defaults to False. + overwrite(`bool`, optional): Switch to enable overwriting of already existing values in pmb.df. Defaults to False. + """ + + token = "#protected:" + + def protect(obj): + if non_standard_value: + return token + json.dumps(obj, cls=cls._NumpyEncoder) + return obj + + def deprotect(obj): + if non_standard_value and isinstance(obj, str) and obj.startswith(token): + return json.loads(obj.removeprefix(token)) + return obj + + # Make sure index is a scalar integer value + index = int(index) + assert isinstance(index, int), '`index` should be a scalar integer value.' + idx = pd.IndexSlice + if cls._check_if_df_cell_has_a_value(df=df, index=index, key=key): + old_value = df.loc[index,idx[key]] + if not pd.Series([protect(old_value)]).equals(pd.Series([protect(new_value)])): + name= df.loc[index,('name','')] + pmb_type= df.loc[index,('pmb_type','')] + logging.debug(f"You are attempting to redefine the properties of {name} of pmb_type {pmb_type}") + if overwrite: + logging.info(f'Overwritting the value of the entry `{key}`: old_value = {old_value} new_value = {new_value}') + if not overwrite: + logging.debug(f"pyMBE has preserved of the entry `{key}`: old_value = {old_value}. If you want to overwrite it with new_value = {new_value}, activate the switch overwrite = True ") + return + + df.loc[index,idx[key]] = protect(new_value) + if non_standard_value: + df[key] = df[key].apply(deprotect) + return + + @classmethod + def _assign_molecule_id(cls, df, molecule_index): + """ + Assigns the `molecule_id` of the pmb object given by `pmb_type` + + Args: + molecule_index(`int`): index of the current `pmb_object_type` to assign the `molecule_id` + Returns: + molecule_id(`int`): Id of the molecule + """ + cls._clean_df_row(df = df, + index = int(molecule_index)) + + if df['molecule_id'].isnull().values.all(): + molecule_id = 0 + else: + molecule_id = df['molecule_id'].max() +1 + cls._add_value_to_df(df = df, + key = ('molecule_id',''), + index = int(molecule_index), + new_value = molecule_id) + return molecule_id + + @staticmethod + def _check_if_df_cell_has_a_value(df, index, key): + """ + Checks if a cell in the `pmb.df` at the specified index and column has a value. + + Args: + df(`DataFrame`): dataframe with pyMBE information. + index(`int`): Index of the row to check. + key(`str`): Column label to check. + + Returns: + `bool`: `True` if the cell has a value, `False` otherwise. + """ + idx = pd.IndexSlice + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + return not pd.isna(df.loc[index, idx[key]]) + + @staticmethod + def _check_if_name_is_defined_in_df(name, df): + """ + Checks if `name` is defined in `pmb.df`. + + Args: + name(`str`): label to check if defined in `pmb.df`. + df(`DataFrame`): dataframe with pyMBE information. + + Returns: + `bool`: `True` for success, `False` otherwise. + """ + return name in df['name'].unique() + + @staticmethod + def _check_if_multiple_pmb_types_for_name(name, pmb_type_to_be_defined, df): + """ + Checks if `name` is defined in `pmb.df` with multiple pmb_types. + + Args: + name(`str`): label to check if defined in `pmb.df`. + pmb_type_to_be_defined(`str`): pmb object type corresponding to `name`. + df(`DataFrame`): dataframe with pyMBE information. + + Returns: + `bool`: `True` for success, `False` otherwise. + """ + if name in df['name'].unique(): + current_object_type = df[df['name']==name].pmb_type.values[0] + if current_object_type != pmb_type_to_be_defined: + raise ValueError (f"The name {name} is already defined in the df with a pmb_type = {current_object_type}, pyMBE does not support objects with the same name but different pmb_types") + + @classmethod + def _clean_df_row(cls, df, index, columns_keys_to_clean=("particle_id", "particle_id2", "residue_id", "molecule_id")): + """ + Cleans the columns of `pmb.df` in `columns_keys_to_clean` of the row with index `index` by assigning them a pd.NA value. + + Args: + df(`DataFrame`): dataframe with pyMBE information. + index(`int`): Index of the row to clean. + columns_keys_to_clean(`list` of `str`, optional): List with the column keys to be cleaned. Defaults to [`particle_id`, `particle_id2`, `residue_id`, `molecule_id`]. + """ + for column_key in columns_keys_to_clean: + cls._add_value_to_df(df = df, + key = (column_key,''), + index = index, + new_value = pd.NA) + df.fillna(pd.NA, + inplace = True) + + @staticmethod + def _clean_ids_in_df_row(df, row): + """ + Cleans particle, residue and molecules ids in `row`. + If there are other repeated entries for the same name, drops the row. + + Args: + df(`DataFrame`): dataframe with pyMBE information. + row(pd.DataFrame): A row from the DataFrame to clean. + + Returns: + df(`DataFrame`): dataframe with pyMBE information with cleaned ids in `row` + """ + columns_to_clean = ['particle_id', + 'particle_id2', + 'residue_id', + 'molecule_id'] + if len(df.loc[df['name'] == row['name'].values[0]]) > 1: + df = df.drop(row.index).reset_index(drop=True) + + else: + for column_name in columns_to_clean: + df.loc[row.index, column_name] = pd.NA + return df + + @staticmethod + def _copy_df_entry(df, name, column_name, number_of_copies): + ''' + Creates 'number_of_copies' of a given 'name' in `pymbe.df`. + + Args: + df(`DataFrame`): dataframe with pyMBE information. + name(`str`): Label of the particle/residue/molecule type to be created. `name` must be defined in `pmb.df` + column_name(`str`): Column name to use as a filter. + number_of_copies(`int`): number of copies of `name` to be created. + + Returns: + df(`DataFrame`): dataframe with pyMBE information with the new copies of `name` added. + + Note: + - Currently, column_name only supports "particle_id", "particle_id2", "residue_id" and "molecule_id" + ''' + valid_column_names=["particle_id", "residue_id", "molecule_id", "particle_id2" ] + if column_name not in valid_column_names: + raise ValueError(f"{column_name} is not a valid column_name, currently only the following are supported: {valid_column_names}") + df_by_name = df.loc[df.name == name] + if number_of_copies != 1: + df_by_name_repeated = pd.concat ([df_by_name]*(number_of_copies-1), ignore_index=True) + # Concatenate the new particle rows to `df` + df = pd.concat ([df,df_by_name_repeated], ignore_index=True) + else: + if not df_by_name[column_name].isnull().values.any(): + df_by_name = df_by_name[df_by_name.index == df_by_name.index.min()] + df_by_name_repeated = pd.concat ([df_by_name]*(number_of_copies), ignore_index=True) + df_by_name_repeated[column_name] = pd.NA + df = pd.concat ([df,df_by_name_repeated], ignore_index=True) + return df + + @staticmethod + def _create_variable_with_units(variable, units_registry): + """ + Returns a pint object with the value and units defined in `variable`. + + Args: + variable(`dict` or `str`): {'value': value, 'units': units} + units_registry(`pint.UnitRegistry`): pyMBE UnitRegistry object. + + Returns: + variable_with_units(`obj`): variable with units using the pyMBE UnitRegistry. + """ + if isinstance(variable, dict): + value=variable.pop('value') + units=variable.pop('units') + elif isinstance(variable, str): + value = float(re.split(r'\s+', variable)[0]) + units = re.split(r'\s+', variable)[1] + variable_with_units = value * units_registry(units) + return variable_with_units + + @classmethod + def _convert_columns_to_original_format(cls,df,units_registry): + """ + Converts the columns of the Dataframe to the original format in pyMBE. + + Args: + df(`DataFrame`): dataframe with pyMBE information as a string + units_registry(`pint.UnitRegistry`): pyMBE UnitRegistry object. + + """ + + columns_dtype_int = ['particle_id','particle_id2', 'residue_id','molecule_id', ('state_one','es_type'),('state_two','es_type'),('state_one','z'),('state_two','z') ] + + columns_with_units = ['sigma', 'epsilon', 'cutoff', 'offset'] + + columns_with_list_or_dict = ['residue_list','side_chains', 'parameters_of_the_potential','sequence', 'chain_map', 'node_map'] + + for column_name in columns_dtype_int: + df[column_name] = df[column_name].astype(pd.Int64Dtype()) + + for column_name in columns_with_list_or_dict: + if df[column_name].isnull().all(): + df[column_name] = df[column_name].astype(object) + else: + df[column_name] = df[column_name].apply(lambda x: json.loads(x) if pd.notnull(x) else x) + + for column_name in columns_with_units: + df[column_name] = df[column_name].apply(lambda x: cls._create_variable_with_units(x, units_registry) if pd.notnull(x) else x) + + df['bond_object'] = df['bond_object'].apply(lambda x: cls._convert_str_to_bond_object(x) if pd.notnull(x) else x) + df["l0"] = df["l0"].astype(object) + df["pka"] = df["pka"].astype(object) + + @staticmethod + def _convert_str_to_bond_object(bond_str): + """ + Convert a row read as a `str` to the corresponding ESPResSo bond object. + + Args: + bond_str(`str`): string with the information of a bond object. + + Returns: + bond_object(`obj`): ESPResSo bond object. + + Note: + Currently supported bonds are: HarmonicBond and FeneBond + """ + import espressomd.interactions + + supported_bonds = ['HarmonicBond', 'FeneBond'] + m = re.search(r'^([A-Za-z0-9_]+)\((\{.+\})\)$', bond_str) + if m is None: + raise ValueError(f'Cannot parse bond "{bond_str}"') + bond = m.group(1) + if bond not in supported_bonds: + raise NotImplementedError(f"Bond type '{bond}' currently not implemented in pyMBE, accepted types are {supported_bonds}") + params = json.loads(m.group(2)) + bond_id = params.pop("bond_id") + bond_object = getattr(espressomd.interactions, bond)(**params) + bond_object._bond_id = bond_id + return bond_object + + @staticmethod + def _delete_entries_in_df(df, entry_name): + """ + Deletes entries with name `entry_name` from the DataFrame if it exists. + + Args: + df(`DataFrame`): dataframe with pyMBE information. + entry_name (`str`): The name of the entry in the dataframe to delete. + + Returns: + df(`DataFrame`): dataframe with pyMBE information with the entry deleted. + """ + if entry_name in df["name"].values: + df = df[df["name"] != entry_name].reset_index(drop=True) + return df + + @staticmethod + def _find_bond_key(df, particle_name1, particle_name2, use_default_bond=False): + """ + Searches for the `name` of the bond between `particle_name1` and `particle_name2` in `pymbe.df` and returns it. + + Args: + df(`DataFrame`): dataframe with pyMBE information. + particle_name1(`str`): label of the type of the first particle type of the bonded particles. + particle_name2(`str`): label of the type of the second particle type of the bonded particles. + use_default_bond(`bool`, optional): If it is activated, the "default" bond is returned if no bond is found between `particle_name1` and `particle_name2`. Defaults to 'False'. + + Returns: + bond_key (str): `name` of the bond between `particle_name1` and `particle_name2` if a matching bond exists + + Note: + - If `use_default_bond`=`True`, it returns "default" if no key is found. + """ + bond_keys = [f'{particle_name1}-{particle_name2}', f'{particle_name2}-{particle_name1}'] + bond_defined=False + for bond_key in bond_keys: + if bond_key in df["name"].values: + bond_defined=True + correct_key=bond_key + break + if bond_defined: + return correct_key + elif use_default_bond: + return 'default' + else: + return None + + @staticmethod + def _setup_df(): + """ + Sets up the pyMBE's dataframe `pymbe.df`. + + Returns: + columns_names(`obj`): pandas multiindex object with the column names of the pyMBE's dataframe + """ + + columns_dtypes = { + 'name': { + '': str}, + 'pmb_type': { + '': str}, + 'particle_id': { + '': pd.Int64Dtype()}, + 'particle_id2': { + '': pd.Int64Dtype()}, + 'residue_id': { + '': pd.Int64Dtype()}, + 'molecule_id': { + '': pd.Int64Dtype()}, + 'acidity': { + '': str}, + 'pka': { + '': object}, + 'central_bead': { + '': object}, + 'side_chains': { + '': object}, + 'residue_list': { + '': object}, + 'model': { + '': str}, + 'sigma': { + '': object}, + 'cutoff': { + '': object}, + 'offset': { + '': object}, + 'epsilon': { + '': object}, + 'state_one': { + 'label': str, + 'es_type': pd.Int64Dtype(), + 'z': pd.Int64Dtype()}, + 'state_two': { + 'label': str, + 'es_type': pd.Int64Dtype(), + 'z': pd.Int64Dtype()}, + 'sequence': { + '': object}, + 'bond_object': { + '': object}, + 'parameters_of_the_potential':{ + '': object}, + 'l0': { + '': float}, + 'node_map':{ + '':object}, + 'chain_map':{ + '':object}} + + df = pd.DataFrame(columns=pd.MultiIndex.from_tuples([(col_main, col_sub) for col_main, sub_cols in columns_dtypes.items() for col_sub in sub_cols.keys()])) + + for level1, sub_dtypes in columns_dtypes.items(): + for level2, dtype in sub_dtypes.items(): + df[level1, level2] = df[level1, level2].astype(dtype) + + columns_names = pd.MultiIndex.from_frame(df) + columns_names = columns_names.names + + return df \ No newline at end of file diff --git a/testsuite/bond_tests.py b/testsuite/bond_tests.py index cced0a6..2454bf6 100644 --- a/testsuite/bond_tests.py +++ b/testsuite/bond_tests.py @@ -24,7 +24,7 @@ import json import io import logging - +import pyMBE.storage.df_management as df_management # Create an in-memory log stream log_stream = io.StringIO() @@ -38,7 +38,7 @@ class Test(ut.TestCase): def setUp(self): - pmb.setup_df() + pmb.df = df_management._DFManagement._setup_df() def check_bond_setup(self, bond_object, input_parameters, bond_type): """ @@ -91,7 +91,7 @@ def test_bond_harmonic(self): # check bond deserialization bond_params = bond_object.get_params() bond_params["bond_id"] = bond_object._bond_id - deserialized = pmb.convert_str_to_bond_object( + deserialized = df_management._DFManagement._convert_str_to_bond_object( f'{bond_object.__class__.__name__}({json.dumps(bond_params)})') self.check_bond_setup(bond_object=deserialized, input_parameters=bond, @@ -130,7 +130,7 @@ def test_bond_fene(self): # check bond deserialization bond_params = bond_object.get_params() bond_params["bond_id"] = bond_object._bond_id - deserialized = pmb.convert_str_to_bond_object( + deserialized = df_management._DFManagement._convert_str_to_bond_object( f'{bond_object.__class__.__name__}({json.dumps(bond_params)})') self.check_bond_setup(bond_object=deserialized, input_parameters=bond, @@ -278,22 +278,22 @@ def test_bond_framework(self): # check deserialization exceptions with self.assertRaises(ValueError): - pmb.convert_str_to_bond_object('Not_A_Bond()') + df_management._DFManagement._convert_str_to_bond_object('Not_A_Bond()') with self.assertRaises(json.decoder.JSONDecodeError): - pmb.convert_str_to_bond_object('HarmonicBond({invalid_json})') + df_management._DFManagement._convert_str_to_bond_object('HarmonicBond({invalid_json})') with self.assertRaises(NotImplementedError): - pmb.convert_str_to_bond_object('QuarticBond({"r_0": 1., "k": 1.})') + df_management._DFManagement._convert_str_to_bond_object('QuarticBond({"r_0": 1., "k": 1.})') # check bond keys - self.assertEqual(pmb.find_bond_key('A', 'A'), 'A-A') - self.assertEqual(pmb.find_bond_key('B', 'B'), 'B-B') - self.assertEqual(pmb.find_bond_key('A', 'A', use_default_bond=True), 'A-A') - self.assertEqual(pmb.find_bond_key('Z', 'Z', use_default_bond=True), 'default') - self.assertIsNone(pmb.find_bond_key('A', 'B')) - self.assertIsNone(pmb.find_bond_key('B', 'A')) - self.assertIsNone(pmb.find_bond_key('Z', 'Z')) - self.assertEqual(pmb.find_bond_key('A', 'B', use_default_bond=True), 'default') - + self.assertEqual(df_management._DFManagement._find_bond_key(df = pmb.df, particle_name1 = 'A', particle_name2 = 'A'), 'A-A') + self.assertEqual(df_management._DFManagement._find_bond_key(df = pmb.df, particle_name1 = 'B', particle_name2 = 'B'), 'B-B') + self.assertEqual(df_management._DFManagement._find_bond_key(df = pmb.df, particle_name1 = 'A', particle_name2 = 'A', use_default_bond=True), 'A-A') + self.assertEqual(df_management._DFManagement._find_bond_key(df = pmb.df, particle_name1 = 'Z', particle_name2 = 'Z', use_default_bond=True), 'default') + self.assertIsNone(df_management._DFManagement._find_bond_key(df = pmb.df, particle_name1 = 'A', particle_name2 = 'B')) + self.assertIsNone(df_management._DFManagement._find_bond_key(df = pmb.df, particle_name1 = 'B', particle_name2 = 'A')) + self.assertIsNone(df_management._DFManagement._find_bond_key(df = pmb.df, particle_name1 = 'Z', particle_name2 = 'Z')) + self.assertEqual(df_management._DFManagement._find_bond_key(df = pmb.df, particle_name1 = 'A', particle_name2 = 'B', use_default_bond=True), 'default') + self.assertIsNone(pmb.search_bond('A', 'B', hard_check=False)) log_contents = log_stream.getvalue() self.assertIn("Bond not defined between particles A and B", log_contents) @@ -305,12 +305,16 @@ def test_bond_framework(self): pmb.search_bond('A', 'B' , hard_check=True) # check invalid bond index - pmb.add_value_to_df(key=('particle_id',''), new_value=10, - index=np.where(pmb.df['name']=='A')[0][0]) - pmb.add_value_to_df(key=('particle_id',''), new_value=20, - index=np.where(pmb.df['name']=='B')[0][0]) - self.assertIsNone(pmb.add_bond_in_df(10, 20, use_default_bond=False)) - self.assertIsNone(pmb.add_bond_in_df(10, 20, use_default_bond=True)) + df_management._DFManagement._add_value_to_df(df = pmb.df, + key = ('particle_id',''), + new_value = 10, + index = np.where(pmb.df['name']=='A')[0][0]) + df_management._DFManagement._add_value_to_df(df = pmb.df, + key = ('particle_id',''), + new_value = 20, + index = np.where(pmb.df['name']=='B')[0][0]) + self.assertIsNone(df_management._DFManagement._add_bond_in_df(pmb.df, 10, 20, use_default_bond=False)) + self.assertIsNone(df_management._DFManagement._add_bond_in_df(pmb.df, 10, 20, use_default_bond=True)) # check bond lengths self.assertAlmostEqual(pmb.get_bond_length('A', 'A'), diff --git a/testsuite/charge_number_map_tests.py b/testsuite/charge_number_map_tests.py index 4fba965..917bfd5 100644 --- a/testsuite/charge_number_map_tests.py +++ b/testsuite/charge_number_map_tests.py @@ -19,6 +19,7 @@ # Import pyMBE and other libraries import pyMBE import numpy as np +import pyMBE.storage.df_management as df_management # Create an instance of pyMBE library pmb = pyMBE.pymbe_library(seed=42) @@ -49,7 +50,7 @@ def check_charge_number_map(input_parameters): print("*** get_charge_number_map unit tests ***") print("*** Unit test: check that get_charge_number_map works correctly for inert particles***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() input_parameters={"name":"I", "acidity": "inert", "pka": np.nan, @@ -60,7 +61,7 @@ def check_charge_number_map(input_parameters): print("*** Unit test passed ***") print("*** Unit test: check that get_charge_number_map works correctly for acidic particles***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() input_parameters={"name":"A", "acidity": "acidic", "pka":4} @@ -70,7 +71,7 @@ def check_charge_number_map(input_parameters): print("*** Unit test passed ***") print("*** Unit test: check that get_charge_number_map works correctly for basic particles***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() input_parameters={"name":"B", "acidity": "basic", "pka":4} diff --git a/testsuite/lj_tests.py b/testsuite/lj_tests.py index d6e7310..201a6ed 100644 --- a/testsuite/lj_tests.py +++ b/testsuite/lj_tests.py @@ -18,8 +18,8 @@ # Import pyMBE and other libraries import pyMBE +import pyMBE.storage.df_management as df_management import numpy as np - import logging import io # Create an in-memory log stream @@ -48,7 +48,7 @@ print("*** Unit test passed ***") print("*** Unit test: check that `offset` defaults to 0***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() # Define dummy particle pmb.define_particle(name="A") @@ -59,7 +59,7 @@ print("*** Unit test: check that `cutoff` defaults to `2**(1./6.) reduced_length` ***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() # Define dummy particle pmb.define_particle(name="A") @@ -95,7 +95,7 @@ print("*** Unit test: test that setup_lj_interactions sets up inert particles correctly ***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() # Define particles A_input_parameters={"name":"A", "sigma":1*pmb.units.nm, @@ -130,7 +130,8 @@ log_contents = log_stream.getvalue() assert "The following particles do not have a defined value of sigma or epsilon" in log_contents -pmb.delete_entries_in_df("X") +df_management._DFManagement._delete_entries_in_df(df=pmb.df, + entry_name="X") # ValueError if combining-rule other than Lorentz_-Berthelot is used input_params = {"espresso_system":espresso_system, "combining_rule": "Geometric"} diff --git a/testsuite/parameter_test.py b/testsuite/parameter_test.py index 458102a..a2825df 100644 --- a/testsuite/parameter_test.py +++ b/testsuite/parameter_test.py @@ -20,6 +20,7 @@ import pyMBE import pandas as pd import numpy as np +import pyMBE.storage.df_management as df_management pmb = pyMBE.pymbe_library(seed=42) @@ -41,7 +42,7 @@ path_to_pka=pmb.root / "parameters" / "pka_sets" / "Hass2015.json" # First order of loading parameters -pmb.setup_df() # clear the pmb_df +pmb.df = df_management._DFManagement._setup_df() # clear the pmb_df pmb.load_interaction_parameters (filename=peptides_root / "Lunkad2021.json") pmb.load_pka_set(filename=pka_root / "Hass2015.json") df_1 = pmb.df.copy() @@ -51,7 +52,7 @@ # Drop bond_object (assert_frame_equal does not process it well) df_1 = df_1.sort_index(axis=1).drop(labels="bond_object", axis=1) # Second order of loading parameters -pmb.setup_df() # clear the pmb_df +pmb.df = df_management._DFManagement._setup_df() # clear the pmb_df pmb.load_pka_set (filename=path_to_pka) #print(pmb.df["acidity"]) pmb.load_interaction_parameters(filename=path_to_interactions) @@ -70,7 +71,7 @@ print("*** Test passed ***") print("*** Unit test: check that load_interaction_parameters loads FENE bonds correctly ***") -pmb.setup_df() # clear the pmb_df +pmb.df = df_management._DFManagement._setup_df() # clear the pmb_df pmb.load_interaction_parameters (filename=data_root / "test_FENE.json") expected_parameters = {'r_0' : 0.4*pmb.units.nm, @@ -89,7 +90,7 @@ print("*** Test passed ***") print("*** Unit test: check that load_interaction_parameters loads residue, molecule and peptide objects correctly ***") -pmb.setup_df() # clear the pmb_df +pmb.df = df_management._DFManagement._setup_df() # clear the pmb_df pmb.load_interaction_parameters (filename=data_root / "test_molecules.json") expected_residue_parameters={"central_bead": "A", "side_chains": ["B","C"] } @@ -117,12 +118,12 @@ verbose=True) print("*** Test passed ***") print("*** Unit test: check that load_interaction_parameters raises a ValueError if one loads a data set with an unknown pmb_type ***") -pmb.setup_df() # clear the pmb_df +pmb.df = df_management._DFManagement._setup_df() # clear the pmb_df input_parameters={"filename": data_root / "test_non_valid_object.json"} np.testing.assert_raises(ValueError, pmb.load_interaction_parameters, **input_parameters) print("*** Test passed ***") print("*** Unit test: check that load_interaction_parameters raises a ValueError if one loads a bond not supported by pyMBE ***") -pmb.setup_df() # clear the pmb_df +pmb.df = df_management._DFManagement._setup_df() # clear the pmb_df input_parameters={"filename": data_root / "test_non_valid_bond.json"} np.testing.assert_raises(ValueError, pmb.load_interaction_parameters, **input_parameters) print("*** Test passed ***") diff --git a/testsuite/read-write-df_test.py b/testsuite/read-write-df_test.py index 455ac98..0ca1187 100644 --- a/testsuite/read-write-df_test.py +++ b/testsuite/read-write-df_test.py @@ -22,6 +22,7 @@ import numpy as np import logging import io +import pyMBE.storage.df_management as df_management # Create an in-memory log stream log_stream = io.StringIO() @@ -168,5 +169,5 @@ # Test that copy_df_entry raises an error if one provides a non-valid column name print("*** Unit test: check that copy_df_entry raises an error if the entry does not exist ***") -np.testing.assert_raises(ValueError, pmb.copy_df_entry, name='test', column_name='non_existing_column',number_of_copies=1) +np.testing.assert_raises(ValueError, df_management._DFManagement._copy_df_entry, df = pmb.df, name='test', column_name='non_existing_column',number_of_copies=1) print("*** Unit test passed***") \ No newline at end of file diff --git a/testsuite/serialization_test.py b/testsuite/serialization_test.py index 9315beb..e1a0ea3 100644 --- a/testsuite/serialization_test.py +++ b/testsuite/serialization_test.py @@ -23,12 +23,12 @@ import pyMBE import pyMBE.lib.analysis import scipy.constants - +import pyMBE.storage.df_management as df_management class Serialization(ut.TestCase): def test_json_encoder(self): - encoder = pyMBE.pymbe_library.NumpyEncoder + encoder = df_management._DFManagement._NumpyEncoder # Python types self.assertEqual(json.dumps(1, cls=encoder), "1") self.assertEqual(json.dumps([1, 2], cls=encoder), "[1, 2]") diff --git a/testsuite/set_particle_acidity_test.py b/testsuite/set_particle_acidity_test.py index a813776..af628b7 100644 --- a/testsuite/set_particle_acidity_test.py +++ b/testsuite/set_particle_acidity_test.py @@ -20,6 +20,7 @@ import numpy as np import pandas as pd import pyMBE +import pyMBE.storage.df_management as df_management # Create an instance of pyMBE library pmb = pyMBE.pymbe_library(seed=42) @@ -71,7 +72,7 @@ def check_acid_base_setup(input_parameters, acidity_setup): print("*** Particle acidity unit tests ***") print("*** Unit test: check that all acid/base input parameters in define_particle for an inert particle are correctly stored in pmb.df***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() input_parameters={"name":"I", "acidity": pd.NA, "pka": pd.NA, @@ -87,7 +88,7 @@ def check_acid_base_setup(input_parameters, acidity_setup): print("*** Unit test passed ***") print("*** Unit test: check that a deprecation warning is raised if the keyword 'inert' is used for acidity ***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() input_parameters={"name":"I", "acidity": "inert", "pka": pd.NA, @@ -96,7 +97,7 @@ def check_acid_base_setup(input_parameters, acidity_setup): print("*** Unit test passed ***") print("*** Unit test: check that all acid/base input parameters in define_particle for an acid are correctly stored in pmb.df***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() input_parameters={"name":"A", "acidity": "acidic", "pka":4} @@ -110,7 +111,7 @@ def check_acid_base_setup(input_parameters, acidity_setup): print("*** Unit test passed ***") print("*** Unit test: check that all acid/base input parameters in define_particle for a base are correctly stored in pmb.df***") # Clean pmb.df -pmb.setup_df() +pmb.df = df_management._DFManagement._setup_df() input_parameters={"name":"B", "acidity": "basic", "pka":9} diff --git a/testsuite/test_in_out_pmb_df.py b/testsuite/test_in_out_pmb_df.py index 7cdbbdc..f614a12 100644 --- a/testsuite/test_in_out_pmb_df.py +++ b/testsuite/test_in_out_pmb_df.py @@ -22,6 +22,7 @@ import unittest as ut import logging import re +import pyMBE.storage.df_management as df_management # Create an in-memory log stream log_stream = io.StringIO() @@ -61,7 +62,10 @@ def test_add_to_df(self): new_value='T2' name=pmb.df.loc[index,key] pmb_type=pmb.df.loc[index,('pmb_type','')] - pmb.add_value_to_df(index=index,key=key,new_value=new_value) + df_management._DFManagement._add_value_to_df(df = pmb.df, + index = index, + key = key, + new_value = new_value) log_contents = log_stream.getvalue() warning_msg1=f"You are attempting to redefine the properties of {name} of pmb_type {pmb_type}" warning_msg2=f"pyMBE has preserved of the entry `{key}`: old_value = {old_value}. If you want to overwrite it with new_value = {new_value}, activate the switch overwrite = True" @@ -76,7 +80,11 @@ def test_add_to_df(self): new_value='T2' name=pmb.df.loc[index,key] pmb_type=pmb.df.loc[index,('pmb_type','')] - pmb.add_value_to_df(index=index,key=key,new_value=new_value,overwrite=True) + df_management._DFManagement._add_value_to_df(df = pmb.df, + index = index, + key = key, + new_value = new_value, + overwrite = True) log_contents = log_stream.getvalue() warning_msg1=f"You are attempting to redefine the properties of {name} of pmb_type {pmb_type}" warning_msg2=f"Overwritting the value of the entry `{key}`: old_value = {old_value} new_value = {new_value}" @@ -88,9 +96,11 @@ def test_add_to_df(self): def test_delete_entries_df(self): print("*** Unit test: test that entries in df are deleted properly using `delete_entries_in_df` ***") - pmb.delete_entries_in_df(entry_name="S1-S2") + pmb.df = df_management._DFManagement._delete_entries_in_df(df=pmb.df, + entry_name="S1-S2") assert pmb.df[pmb.df["name"]=="S1-S2"].empty - pmb.delete_entries_in_df(entry_name="S1") + pmb.df = df_management._DFManagement._delete_entries_in_df(df=pmb.df, + entry_name="S1") assert pmb.df[pmb.df["name"]=="S1"].empty residue_parameters={"R1":{"name": "R1", @@ -103,7 +113,8 @@ def test_delete_entries_df(self): for parameter_set in residue_parameters.values(): pmb.define_residue(**parameter_set) - pmb.delete_entries_in_df(entry_name="R1") + pmb.df = df_management._DFManagement._delete_entries_in_df(df=pmb.df, + entry_name="R1") assert pmb.df[pmb.df["name"]=="R1"].empty molecule_parameters={"M1":{"name": "M1", @@ -112,7 +123,8 @@ def test_delete_entries_df(self): for parameter_set in molecule_parameters.values(): pmb.define_molecule(**parameter_set) - pmb.delete_entries_in_df(entry_name="M1") + pmb.df = df_management._DFManagement._delete_entries_in_df(df=pmb.df, + entry_name="M1") assert pmb.df[pmb.df["name"]=="M1"].empty print("*** Unit passed ***")