Source code for arbdmodel.ibi

from abc import ABCMeta
from scipy.signal import savgol_filter as savgol
import numpy as np
from pathlib import Path
from . import ArbdModel
from . import logger
from .interactions import AbstractPotential

from tqdm import tqdm,trange
from tqdm.contrib.logging import logging_redirect_tqdm


""" This file includes various routines to simplify running
simulations with Iterative Boltmann Inversion. """


[docs] class DegreeOfFreedom(): """ Base class for representing a degree of freedom in the system. Concrete examples include the distance, angle, or dihedral angle, between nearby "bonded" interaction sites, or the distances between pairs of particle types (with exclusions!). Derived classes should calculate a value for the DoF from particle coordinates. """ def __init__(self,*particles): self.particles = particles # self.ids = [p.idx for p in particles] self.sels = dict() self.current_key = None def _particle_to_sel(particle, universe): """ Recursive function to convert particle or group_site to selection or list of selections """ if isinstance(particle, ArbdModel._GroupSite): return [DegreeOfFreedom._particle_to_sel(p,universe) for p in particle.particles] else: return universe.select_atoms( "index {}".format(particle.idx) ) def _sel_list_to_positions(sel_list): """ Recursive function to convert list of selctions into positions """ positions = [] for sel in sel_list: if isinstance(sel, list): positions.append( DegreeOfFreedom._sel_list_to_positions(sel).mean(axis=0) ) else: positions.append( sel.positions.mean(axis=0) ) return positions
[docs] def wrap_vector(self, v): box = self.sels[self.current_key][0].universe.dimensions assert(np.all( np.array(box[3:]) == 90 )) for i in range(3): sl = (v[...,i] > 0.5*box[i]) v[sl,i] = v[sl,i] - box[i] sl = (v[...,i] < -0.5*box[i]) v[sl,i] = v[sl,i] + box[i] return v
[docs] def get_values(self, universe): key = universe.__hash__() if key not in self.sels: self.sels[key] = [DegreeOfFreedom._particle_to_sel(p,universe) for p in self.particles] positions = DegreeOfFreedom._sel_list_to_positions( self.sels[key] ) self.current_key = key result = [self._get_value(positions)] assert( np.all( ~np.isnan(result) ) ) assert( len(result) > 0 ) self.current_key = None return result
[docs] def compute_volume(self, bins): return np.diff(bins)
[docs] class RadiusDof(DegreeOfFreedom): def _get_value(self, positions): for p in positions: assert( len(p) == 3 ) ri, = positions dist = np.linalg.norm(ri) return dist
[docs] def compute_volume(self, bins): return (4.0/3)*np.pi*(bins[1:]**3-bins[:-1]**3)
[docs] class BondDof(DegreeOfFreedom): def _get_value(self, positions): for p in positions: assert( len(p) == 3 ) ri,rj = positions dist = np.linalg.norm( self.wrap_vector(rj-ri) ) return dist
[docs] def compute_volume(self, bins): return (4.0/3)*np.pi*(bins[1:]**3-bins[:-1]**3)
[docs] class AngleDof(DegreeOfFreedom): def _get_value(self, positions): for p in positions: assert( len(p) == 3 ) ri,rj,rk = positions rji = self.wrap_vector(ri-rj) rjk = self.wrap_vector(rk-rj) cos = rji.dot(rjk) / (np.linalg.norm(rji)*np.linalg.norm(rjk)) angle = np.arccos(cos) angle = angle*180/np.pi return angle
[docs] def compute_volume(self, bins): sin = np.sin(bins*np.pi/180) return 0.5*(sin[1:]+sin[:-1])
[docs] class DihedralDof(DegreeOfFreedom): def _get_value(self, positions): for p in positions: assert( len(p) == 3 ) posa,posb,posc,posd = positions ab = self.wrap_vector(posa - posb) bc = self.wrap_vector(posb - posc) cd = self.wrap_vector(posc - posd) distbc = np.linalg.norm(bc) crossABC = np.cross(ab,bc) crossBCD = np.cross(bc,cd) crossX = np.cross(bc,crossABC) cos_phi = crossABC.dot(crossBCD) / (np.linalg.norm(crossABC) * np.linalg.norm(crossBCD)) sin_phi = crossX.dot(crossBCD) / (np.linalg.norm(crossX) * np.linalg.norm(crossBCD)) angle = -np.arctan2(sin_phi, cos_phi) angle = angle*180/np.pi return angle
[docs] class BondAngleDof(DegreeOfFreedom): def _get_value(self, positions): raise NotImplementedError for p in positions: assert( len(p) == 3 ) posa,posb,posc,posd = positions ab = self.wrap_vector(posa - posb) bc = self.wrap_vector(posb - posc) cd = self.wrap_vector(posc - posd) distbc = np.linalg.norm(bc) crossABC = np.cross(ab,bc) crossBCD = np.cross(bc,cd) crossX = np.cross(bc,crossABC) cos_phi = crossABC.dot(crossBCD) / (np.linalg.norm(crossABC) * np.linalg.norm(crossBCD)) sin_phi = crossX.dot(crossBCD) / (np.linalg.norm(crossX) * np.linalg.norm(crossBCD)) angle = -np.arctan2(sin_phi, cos_phi) angle = angle*180/np.pi return angle
[docs] class PairDistributionDof(): def __init__(self, particles_A, particles_B, range_=(0,50), resolution=0.1, exclusions=None): if exclusions is None: exclusions = [] self.particles_A = particles_A self.particles_B = particles_B self.range_ = range_ self.resolution = resolution self.exclusions = exclusions self.sel_A = dict() self.sel_B = dict() self.exclusions_mask = dict() def _particles_to_sel(particles, universe): for particle in particles: if isinstance(particle, ArbdModel._GroupSite): raise NotImplementedError sel_string = f'index {" ".join([str(p.idx) for p in particles])}' return universe.select_atoms( sel_string ) def _build_exclusion_matrix(self, universe): key = universe.__hash__() if key in self.exclusions_mask: return self.exclusions_mask[key] logger.info('Building exclusions') n_atoms = len(universe.atoms) A, B = self.sel_A[key], self.sel_B[key] ex = self.exclusions_mask[key] = ex = dict() _index_to_sel_order_A = -1*np.ones(n_atoms) _index_to_sel_order_B = -1*np.ones(n_atoms) for i,I in enumerate(A.atoms.indices): _index_to_sel_order_A[I] = i ex[(i,i)] = 1 for i,I in enumerate(B.atoms.indices): _index_to_sel_order_B[I] = i ex[(i,i)] = 1 for p1,p2 in self.exclusions: _ids = [(_index_to_sel_order_A[p1.idx], _index_to_sel_order_B[p2.idx]), (_index_to_sel_order_A[p2.idx], _index_to_sel_order_B[p1.idx])] for i,j in _ids: if i < 0 or j < 0: continue ex[(i,j)] = ex[(j,i)] = 1 logger.info('Done building exclusions') return ex
[docs] def get_values(self, universe): from MDAnalysis.lib import distances key = universe.__hash__() if key not in self.sel_A: self.sel_A[key] = PairDistributionDof._particles_to_sel(self.particles_A, universe) if key not in self.sel_B: self.sel_B[key] = PairDistributionDof._particles_to_sel(self.particles_B, universe) A = self.sel_A[key] B = self.sel_B[key] _pairs,_dist = distances.capped_distance( A.positions, B.positions, self.range_[1] + self.resolution, box = universe.dimensions) # n_atoms = len(universe.atoms) ex = self._build_exclusion_matrix(universe) return [d for p,d in zip(_pairs,_dist) if tuple(p) not in ex]
[docs] def compute_volume(self, bins): return (4.0/3)*np.pi*(bins[1:]**3-bins[:-1]**3)
[docs] class AbstractIBIpotential(AbstractPotential, metaclass=ABCMeta): """ A class that implements the Iterative Boltzmann Inversion method for generating potentials. This abstract class provides the foundation for creating IBI potentials that are derived from target distributions. IBI is a method to derive effective pair potentials from radial distribution functions in molecular systems. Parameters ---------- name : str Name of the potential, used for file naming. degrees_of_freedom : list List of degrees of freedom to consider in the potential. range_ : tuple, optional Range of distances to consider (min, max), default is (0, 30). resolution : float, optional Resolution of the binning for the distribution, default is 0.1. max_force : float, optional Maximum force allowed in the potential. max_potential : float, optional Maximum potential value allowed. out_of_bounds_force : str or float, optional Force to apply outside the well-defined density region, default is 'max_force'. zero : str, optional Method to set the zero point of the potential, default is 'last'. smooth : int or None, optional Window size for Savitzky-Golay filter, if None will be calculated automatically. learning_rate : float or callable, optional Learning rate for potential updates, default is 0.9. iteration : int, optional Current iteration number, default is 1. filename_prefix : str, optional Path prefix for saving potentials, default is 'IBIPotentials/'. Attributes ---------- bins : numpy.ndarray Bin edges for the distribution. potential : numpy.ndarray Current potential values. Methods ------- potential(r) Abstract method to calculate potential at distance r. filename(types=None, iteration=None, smoothed=True) Generate filename for the potential. write_file() Write the potential to a file. get_target_distribution(universe=None, trajectory=None, recalculate=False) Get or calculate the target distribution. get_cg_distribution(universe, trajectory=None, box=None, recalculate=False) Get or calculate the coarse-grained distribution. read_cg_potential(iteration=None) Read a potential from a file. write_cg_potential(universe=None, scaling_factor=None, temperature=295, tol=None, clean_edges=True, box=None) Calculate and save the potential based on distributions. Notes ----- IBI is an iterative method that refines potentials to match target distributions. The process involves: 1. Starting with an initial guess (usually Boltzmann inversion of the target) 2. Running simulations with the current potential 3. Comparing resulting distributions with the target 4. Updating the potential based on the difference 5. Repeating until convergence """ def __init__(self, name, degrees_of_freedom=[], range_=(0,30), resolution=0.1, max_force=None, max_potential=None, out_of_bounds_force='max_force', zero='last', smooth=None, learning_rate=0.9, iteration=1, filename_prefix='IBIPotentials/'): self.name = name self.degrees_of_freedom = degrees_of_freedom self.smooth = smooth self.iteration = iteration AbstractPotential.__init__(self, range_, resolution, max_force, max_potential, zero) self.filename_prefix = filename_prefix + self.name self.max_potential = max_potential self.out_of_bounds_force = out_of_bounds_force self.iteration = iteration # self.smooth = 15 if smooth is None else smooth self.learning_rate = learning_rate self.__target = None self.bins = np.arange( self.range_[0], self.range_[1]+self.resolution, self.resolution ) self.potential = np.zeros( len(self.bins)-1 ) self.__dists = {}
[docs] def potential(self, r): raise NotImplementedError
## self.filename_prefix="IBIpotentials/"
[docs] def filename(self, types=None, iteration=None, smoothed=True): if iteration is None: iteration = self.iteration return f"{self.filename_prefix}-{iteration:03d}{'' if smoothed else '-raw'}.dat"
def __str__(self): return self.filename()
[docs] def write_file(self): pass
def __hash__(self): assert(self.type_ != "None") return hash((self.name, self.range_, self.resolution, self.max_force, self.max_potential, self.out_of_bounds_force, self.filename_prefix, self.periodic)) def __eq__(self, other): # def _get_attr_mangle(obj,a): # try: # v = obj.__dict__[a] # except: # v = obj.__dict__[f'_AbstractPotential__{a}'] # return v for a in ("name", "range_", "resolution", "max_force", "max_potential", "filename_prefix", "periodic"): # if _get_attr_mangle(self,a) != _get_attr_mangle(other,a): if getattr(self,a) != getattr(other,a): return False return True def _extract_distribution(self, universe, trajectory = None, box = None): if trajectory is not None: key = (universe, trajectory).__hash__() else: key = universe.__hash__() trajectory = universe.trajectory if key in self.__dists: logger.info(f"{self}._extract_distribution( u.{key} ): using cache") return self.__dists[key] # devlogger.info(f"{self}._extract_distribution( u.{key} ): calculating") bins = self.bins counts = np.zeros( [len(bins)-1] ) nframes = 0 with logging_redirect_tqdm(loggers=[logger]): for t in tqdm(trajectory, desc=f"Extracting distribution {self}"): # if (t.frame % 100) == 0: logger.info(f'Calculating distribution associated with {self} {t.frame}/{len(universe.trajectory)-1}') if box is not None: universe.dimensions = box vals = np.stack([dof.get_values(universe) for dof in self.degrees_of_freedom]) inds = np.digitize(vals, bins) - 1 # if (inds < 0).sum() > 0: (inds >= len(counts)).sum() > 0: # logger.warn(f'inds contains {(inds < 0).sum()} elements < 0 and {(inds >= len(counts)).sum()} elements >= len(counts) ({len(counts)}) of {inds.size} total elements') inds = inds[(inds<len(counts)) & (inds>=0)] counts = counts + np.bincount(inds, minlength=len(counts)) nframes += 1 if np.sum(counts) == 0: raise ValueError(f'Extracting distribution for "{self.filename()}" failed because no values were found in the range {self.range_}; consider specifying a larger range_ parameter when creating the {self.__class__.__name__}') assert(nframes > 0) vol = self.degrees_of_freedom[0].compute_volume(bins) assert(vol.sum() > 0) likelihood = counts / (nframes*vol) ## don't normalize over num values in dofs just yet self.__dists[key] = (bins,likelihood) # record for later return bins, likelihood
[docs] def get_target_distribution(self, universe=None, trajectory=None, recalculate=False): if self.__target is None: f = self.filename_prefix + '.target.dat' if (not Path(f).exists()) or recalculate: if universe is None: raise Exception bins, vals = self._extract_distribution( universe, trajectory=trajectory ) if np.sum(vals) == 0: raise Exception Path(f).parent.mkdir(parents=True, exist_ok=True) np.savetxt(f,np.array((bins[:-1],vals/np.sum(vals),vals)).T) bins, vals, counts = np.loadtxt(f).T if np.sum(counts) == 0: raise Exception self.__target = (bins,vals) if self.smooth is None: bins,vals = self.__target ## Set smooth to ~1/2 stddev _mean = np.average( bins, weights=vals ) _var = np.average( (bins-_mean)**2 , weights=vals ) _dr = (bins[1]-bins[0]) self.smooth = (int(np.round(np.sqrt(_var)/(2*_dr))+1)//2)*2+1 if self.smooth < 5: logger.warning(f'{f}: Smoothing ({self.smooth} points) suggested by 1/2 of stddev ({np.sqrt(_var):02f}) is too low: setting smooth to 5') self.smooth = 5 else: logger.info(f'{f}: Smoothing {self.smooth} points as suggested by 1/2 of stddev ({np.sqrt(_var):02f}) 5') return self.__target
[docs] def get_cg_distribution(self, universe, trajectory=None, box=None, recalculate=False): f = self.filename(smoothed=False).replace('.dat','.cg.dat') if (not Path(f).exists()) or recalculate: logger.info(f"get_cg_distribution(): writing to '{f}'") bins, vals = self._extract_distribution( universe, trajectory=trajectory, box=box ) bins = bins[:len(vals)] Path(f).parent.mkdir(parents=True, exist_ok=True) np.savetxt(f,np.array((bins,vals/np.sum(vals),vals)).T) else: logger.info(f"get_cg_distribution(): reading from '{f}'") bins, vals, counts = np.loadtxt(f).T key = universe.__hash__() if key not in self.__dists: self.__dists[key] = (bins, counts) return bins, vals
[docs] def read_cg_potential(self, iteration=None): if iteration is None: iteration = self.iteration-1 if iteration == 0: bins = self.bins[:-1] pot = np.zeros(bins.shape) else: try: f = self.filename(iteration=iteration, smoothed=True) bins, pot = np.loadtxt(f).T except: f = self.filename(iteration=iteration, smoothed=False) bins, pot = np.loadtxt(f).T return bins,pot
def _apply_max_force(self, bins, u, rho, tol, savgol_opts): if self.max_force is not None: valid = np.where(rho > tol)[0] first = valid[0] last = valid[-1] if first > 0: u[:first] = u[first] + np.abs(bins[:first]-bins[first])*self.max_force if last < len(u)-2: u[last+1:] = u[last] + np.abs(bins[last+1:]-bins[last])*self.max_force return u def _clean_edges(self, bins, rho, tol): """ Removes values to the left of the rightmost spot left of the peak where rho dips below tol, and vice versa """ total_before = np.sum(rho) peak_i = np.where(np.abs(rho) == np.max(np.abs(rho)))[0][0] is_left = (bins < bins[peak_i]) is_small = (rho <= tol) try: first_i = np.where(is_left & is_small)[0][-1] except: first_i = 0 try: last_i = np.where((~is_left) & is_small)[0][0] except: last_i = len(rho) rho[:first_i] = 0 rho[last_i:] = 0 total_after = np.sum(rho) if total_after < 0.9 * total_before: raise ValueError('Removed too much density from the distribution ({100*total_after/total_before:%02d})')
[docs] def write_cg_potential(self, universe=None, scaling_factor = None, temperature = 295, tol = None, clean_edges=True, box=None): if scaling_factor is None: try: scaling_factor = self.learning_rate(self.iteration) except: scaling_factor = self.learning_rate savgol_opts = dict( window_length=1+(self.smooth//2)*2, polyorder=3, mode = 'wrap' if self.periodic else 'nearest' ) bins_aa, rho_aa = self.get_target_distribution() rho_aa = rho_aa / np.sum(rho_aa) if tol is None: tol = max(1e-5, 1e-3 * np.max(rho_aa)) # likely there is room for improvement here if universe is None: bins = self.bins[:-1] assert( np.all(np.isclose(bins - bins_aa, 0)) ) if clean_edges: self._clean_edges(bins_aa, rho_aa, tol) rho_aa = savgol( rho_aa, **savgol_opts ) rho_aa[rho_aa < tol] = tol ## units "295 k K" kcal_mol u = - scaling_factor * 0.58622592 * (temperature/295) * np.log(rho_aa) rho_cg = rho_aa # allows a common smoothing command below else: bins, rho_cg = self._extract_distribution( universe, box=box ) assert( np.abs(len(rho_cg) - len(bins)) < 2 ) bins = bins[:len(rho_cg)] rho_cg = rho_cg/np.sum(rho_cg) assert( np.all(np.isclose(bins - bins_aa, 0)) ) if clean_edges: self._clean_edges(bins_aa, rho_aa, tol) # self._clean_edges(bins_aa, rho_cg, tol) r0,u0 = self.read_cg_potential() # iteration-2? rho_cg,rho_aa = [savgol( rho, **savgol_opts ) for rho in [rho_cg,rho_aa]] rho_aa[rho_aa < tol] = tol rho_cg[rho_cg < tol] = tol ratio = rho_cg/rho_aa ## units "295 k K" "kcal_mol" du = np.log( ratio ) du = du * 0.58622592 * (temperature/295) du = du * (rho_aa/rho_aa.max())**0.25 # penalize learning for values where target density is very low try: alpha = self.learning_rate(self.iteration) except: alpha = self.learning_rate u = u0 + alpha * du f = self.filename(smoothed=False) Path(f).parent.mkdir(parents=True, exist_ok=True) np.savetxt(f,np.array((bins,u)).T) f = self.filename(smoothed=True) ## Only apply savgol filter in region where target density is well-defined valid = np.where(rho_aa > tol)[0] first = valid[0] last = valid[-1] u[first:last+1] = savgol( u[first:last+1], **savgol_opts ) ## Apply boundary force outside where target density is well-defined oobf = self.max_force if self.out_of_bounds_force == 'max_force' else self.out_of_bounds_force if first > 0: u[:first] = u[first] + np.abs(bins[:first]-bins[first])*oobf if last < len(u)-2: u[last+1:] = u[last] + np.abs(bins[last+1:]-bins[last])*oobf u = self._cap_potential(bins, u) np.savetxt(f,np.array((bins,u)).T) return bins,u
## Specialize with sensible defaults for r_range
[docs] class IBIBond(AbstractIBIpotential): def __init__(self, name, degrees_of_freedom=[], range_=(0,20), resolution=0.02, max_force=None, max_potential=None, out_of_bounds_force='max_force', zero='min', smooth=None, learning_rate=0.9, iteration=1, filename_prefix="IBIPotentials/"): # AbstractIBIpotential.__init__(self, name, degrees_of_freedom, range_, resolution, smooth, iteration, max_force, max_potential) AbstractIBIpotential.__init__(self, name, degrees_of_freedom, range_, resolution, max_force, max_potential, out_of_bounds_force, zero, smooth, learning_rate, iteration, filename_prefix) self.type_ = 'IBIbond'
[docs] class IBIAngle(AbstractIBIpotential): def __init__(self, name, degrees_of_freedom, range_=(0,180), resolution=2.0, max_force=None, max_potential=None, out_of_bounds_force='max_force', zero='min', smooth=None, learning_rate=0.9, iteration=1, filename_prefix="IBIPotentials/"): #rm: AbstractIBIpotential.__init__(self, name, degrees_of_freedom, range_, resolution, smooth, iteration, max_force, max_potential, iteration, filename_prefix) AbstractIBIpotential.__init__(self, name, degrees_of_freedom, range_, resolution, max_force, max_potential, out_of_bounds_force, zero, smooth, learning_rate, iteration, filename_prefix) self.type_ = 'IBIangle'
[docs] class IBIDihedral(AbstractIBIpotential): def __init__(self, name, degrees_of_freedom=[], range_=(-180,180), resolution=4.0, max_force=None, max_potential=None, out_of_bounds_force='max_force', zero='min', smooth=None, learning_rate=0.9, iteration=1, filename_prefix="IBIPotentials/"): #rm: AbstractIBIpotential.__init__(self, name, degrees_of_freedom, range_, resolution, smooth, iteration, max_force, max_potential, filename_prefix) AbstractIBIpotential.__init__(self, name, degrees_of_freedom, range_, resolution, max_force, max_potential, out_of_bounds_force, zero, smooth, learning_rate, iteration, filename_prefix) self.type_ = 'IBIdihed' @property def periodic(self): return True
## Specialize with sensible defaults for r_range
[docs] class IBINonbonded(AbstractIBIpotential): def __init__(self, name, degrees_of_freedom=[], range_=(0,50), resolution=0.1, max_force=None, max_potential=None, out_of_bounds_force='max_force', zero='last', smooth=None, learning_rate=0.35, iteration=1, filename_prefix="IBIPotentials/"): AbstractIBIpotential.__init__(self, name, degrees_of_freedom, range_, resolution, max_force, max_potential, out_of_bounds_force, zero, smooth, learning_rate, iteration, filename_prefix) self.type_ = 'IBInonbonded'
[docs] def write_file(self, filename=None, types=None): if filename is None: filename = self.filename(types=types) assert( filename == self.filename(types=types) )