Source code for arbdmodel.model

import numpy as np
from copy import copy, deepcopy
from glob import glob
import MDAnalysis as mda

from tqdm import tqdm, trange
from tqdm.contrib.logging import logging_redirect_tqdm

from .logger import devlogger, logger, get_resource_path
from .interactions import NullPotential
from . import Transformable, Parent, Group
from .sim_config import SimConf
from .core_objects import GroupSite
from .engine import ArbdEngine, NamdEngine
from .coords import calculate_dimensions_from_cell_vectors


[docs] class PdbModel(Transformable, Parent): def __init__(self, children=None, dimensions=None, remove_duplicate_bonded_terms=False): Transformable.__init__(self,(0,0,0)) Parent.__init__(self, children, remove_duplicate_bonded_terms) self.dimensions = dimensions self.particles = [p for p in self if not p.rigid] self.rigid_bodies = [p for p in self if p.rigid] self.cacheInvalid = True def _updateParticleOrder(self): pass
[docs] def write_pdb(self, filename, beta_from_fixed=False): if self.cacheInvalid: self._updateParticleOrder() with open(filename,'w') as fh: ## Write header fh.write("CRYST1{:>9.3f}{:>9.3f}{:>9.3f} 90.00 90.00 90.00 P 1 1\n".format( *self.dimensions )) ## Write coordinates formatString = "ATOM {idx:>6.6s} {name:^4.4s} {resname:3.3s} {chain:1.1s}{resid:>5.5s} {x:8.8s}{y:8.8s}{z:8.8s}{occupancy:6.2f}{beta:6.2f} {charge:2d}{segname:>6s}\n" for p in self.particles: ## http://www.wwpdb.org/documentation/file-format-content/format33/sect9.html#ATOM data = p._get_psfpdb_dictionary() idx = data['idx'] if np.log10(idx) >= 5: idx = " *****" else: idx = "{:>6d}".format(idx) data['idx'] = idx if beta_from_fixed: data['beta'] = 1 if 'fixed' in p.__dict__ else 0 pos = p.get_collapsed_position() dig = [max(int(np.log10(np.abs(x)+1e-6)//1),0)+1 for x in pos] for d in dig: assert( d <= 7 ) # assert( np.all(dig <= 7) ) fs = ["{: %d.%df}" % (8,7-d) for d in dig] x,y,z = [f.format(x) for f,x in zip(fs,pos)] data['x'] = x data['y'] = y data['z'] = z assert(data['resid'] < 1e5) data['charge'] = int(data['charge']) data['resid'] = "{:<4d}".format(data['resid']) fh.write( formatString.format(**data) ) return
[docs] def write_pqr(self, filename): if self.cacheInvalid: self._updateParticleOrder() with open(filename,'w') as fh: ## Write header fh.write("CRYST1{:>9.3f}{:>9.3f}{:>9.3f} 90.00 90.00 90.00 P 1 1\n".format( *self.dimensions )) ## Write coordinates formatString = "ATOM {idx:>6.6s} {name:^4.4s} {resname:3.3s} {chain:1.1s} {resid:>5.5s} {x:.6f} {y:.6f} {z:.6f} {charge} {radius}\n" for p in self.particles: data = p._get_psfpdb_dictionary() idx = data['idx'] if np.log10(idx) >= 5: idx = " *****" else: idx = "{:>6d}".format(idx) data['idx'] = idx x,y,z = p.get_collapsed_position() data['x'] = x data['y'] = y data['z'] = z assert(data['resid'] < 1e5) data['resid'] = "{:<4d}".format(data['resid']) if 'radius' not in data: data['radius'] = 2 * (data['mass']/16)**0.333333 fh.write( formatString.format(**data) ) return
[docs] def write_psf(self, filename): if self.cacheUpToDate == False: self._updateParticleOrder() with open(filename,'w') as fh: ## Write header fh.write("PSF NAMD\n\n") # create NAMD formatted psf fh.write("{:>8d} !NTITLE\n\n".format(0)) ## ATOMS section fh.write("{:>8d} !NATOM\n".format(len(self.particles))) ## From vmd/plugins/molfile_plugin/src/psfplugin.c ## "%d %7s %10s %7s %7s %7s %f %f" formatString = "{idx:>8d} {segname:7.7s} {resid:<10.10s} {resname:7.7s}" + \ " {name:7.7s} {type:7.7s} {charge:f} {mass:f}\n" for p in self.particles: data = p._get_psfpdb_dictionary() data['resid'] = "%d%c%c" % (data['resid']," "," ") # TODO: work with large indices fh.write( formatString.format(**data) ) fh.write("\n") ## Write out bonds bonds = self.get_bonds() fh.write("{:>8d} !NBOND\n".format(len(bonds))) counter = 0 for p1,p2,b,ex in bonds: fh.write( "{:>8d}{:>8d}".format(p1.idx+1,p2.idx+1) ) counter += 1 if counter == 4: fh.write("\n") counter = 0 else: fh.write(" ") fh.write("\n" if counter == 0 else "\n\n") ## Write out angles angles = self.get_angles() fh.write("{:>8d} !NTHETA\n".format(len(angles))) counter = 0 for p1,p2,p3,a in angles: fh.write( "{:>8d}{:>8d}{:>8d}".format(p1.idx+1,p2.idx+1,p3.idx+1) ) counter += 1 if counter == 3: fh.write("\n") counter = 0 else: fh.write(" ") fh.write("\n" if counter == 0 else "\n\n") ## Write out dihedrals dihedrals = self.get_dihedrals() fh.write("{:>8d} !NPHI\n".format(len(dihedrals))) counter = 0 for p1,p2,p3,p4,a in dihedrals: fh.write( "{:>8d}{:>8d}{:>8d}{:>8d}".format(p1.idx+1,p2.idx+1,p3.idx+1,p4.idx+1) ) counter += 1 if counter == 2: fh.write("\n") counter = 0 else: fh.write(" ") fh.write("\n" if counter == 0 else "\n\n") ## Write out impropers impropers = self.get_impropers() fh.write("{:>8d} !NIMPHI\n".format(len(impropers))) counter = 0 for p1,p2,p3,p4,a in impropers: fh.write( "{:>8d}{:>8d}{:>8d}{:>8d}".format(p1.idx+1,p2.idx+1,p3.idx+1,p4.idx+1) ) counter += 1 if counter == 2: fh.write("\n") counter = 0 else: fh.write(" ") fh.write("\n" if counter == 0 else "\n\n") fh.write("\n{:>8d} !NDON: donors\n\n\n".format(0)) fh.write("\n{:>8d} !NACC: acceptors\n\n\n".format(0)) fh.write("\n 0 !NNB\n\n") natoms = len(self.particles) for i in range(natoms//8): fh.write(" 0 0 0 0 0 0 0 0\n") for i in range(natoms-8*(natoms//8)): fh.write(" 0") fh.write("\n\n 1 0 !NGRP\n\n")
[docs] class ArbdModel(PdbModel): """ Advanced model class for ARBD simulations with improved cell vector handling. """ def __init__(self, children, cell_vectors=None, cell_origin=None, dimensions=None, remove_duplicate_bonded_terms=True, buffer_factor=1.2, configuration=None, dummy_types=tuple(), **conf_params): """ Initialize ArbdModel with improved cell vector and dimension handling. Args: children: Child objects for the model cell_vectors: List of 3 cell basis vectors [[x1,y1,z1], [x2,y2,z2], [x3,y3,z3]] cell_origin: Cell origin coordinates [x,y,z] dimensions: Explicit dimensions for the simulation box (overrides cell_vectors) remove_duplicate_bonded_terms: Whether to remove duplicate bonded terms buffer_factor: Factor to scale dimensions derived from cell vectors configuration: SimConf object with configuration parameters dummy_types: Deprecated parameter, kept for backward compatibility **conf_params: Additional configuration parameters """ # Calculate dimensions from cell vectors if provided and dimensions not explicitly set if dimensions is None and cell_vectors is not None: dimensions = calculate_dimensions_from_cell_vectors( cell_vectors, cell_origin, buffer_factor) # Default dimensions if neither dimensions nor cell vectors provided if dimensions is None: dimensions = (1000, 1000, 1000) # Set default cell vectors if not provided if cell_vectors is None: cell_vectors = [ [dimensions[0], 0, 0], [0, dimensions[1], 0], [0, 0, dimensions[2]] ] # Set default cell origin if not provided if cell_origin is None: cell_origin = [0, 0, 0] # Initialize parent class PdbModel.__init__(self, children, dimensions, remove_duplicate_bonded_terms, cell_vectors, cell_origin) # Store origin which might be different from cell_origin self.origin = cell_origin # Set up configuration if configuration is None: configuration = SimConf(**conf_params) self.configuration = configuration # Initialize model properties self.num_particles = 0 self.particles = [] self.type_counts = None self.dummy_types = dummy_types # TODO, determine whether these are really needed if len(self.dummy_types) != 0: raise("Dummy types have been deprecated") self.nonbonded_interactions = [] self._nonbonded_interaction_files = [] # This could be made more robust self.cacheUpToDate = False self.group_sites = []
[docs] def add(self, obj): self._clear_types() # TODO: call upon (potentially nested) `child.add(add)` return Parent.add(self, obj)
[docs] def add_group_site(self, particles, weights=None): g = GroupSite(particles, weights) self.group_sites.append(g) return g
def _clear_types(self): devlogger.debug(f'{self}: Clearing types') self.type_counts = None
[docs] def clear_all(self, keep_children=False): Parent.clear_all(self, keep_children=keep_children) self.particles = [] self.num_particles = 0 self._clear_types() self.group_sites = [] self._nonbonded_interaction_files = []
[docs] def extend(self, other_model, copy=False): if any( [p.rigid for p in self] + [p.rigid for p in other_model] ): raise NotImplementedError('Models containing rigid bodies cannot yet be extended') assert( isinstance(other_model, ArbdModel) ) if copy == True: logger.warning(f'Forcing {self.__class__}.extend(other_model,copy=False)') copy = False ## Combine particle types, taking care to handle name clashes self._countParticleTypes() other_model._countParticleTypes() self.getParticleTypesAndCounts() other_model.getParticleTypesAndCounts() names = set([t.name for t in self.type_counts.keys()]) devlogger.debug(f'Combining types {self.type_counts.keys()} and {other_model.type_counts.keys()}') for t1 in self.type_counts.keys(): for t2 in other_model.type_counts.keys(): if t1.name == t2.name and t1.__eq__(t2, check_equal=False): i = 1 new_name = f'{t2.name}{i}' while new_name in names: i += 1 new_name = f'{t2.name}{i}' devlogger.debug(f'Updating {t2.name} to {new_name}') t2.name = new_name names.add(new_name) ## Combine interactions for i, tA, tB in other_model.nonbonded_interactions: devlogger.debug(f'Combining model interactions {i} {tA} {tB}') self.add_nonbonded_interaction(i,tA,tB) # for g in other_model.children: # self.update(g, copy=copy) g = Group() for attr in 'children position orientation bonds angles dihedrals impropers exclusions vector_angles bond_angles product_potentials group_sites'.split(): g.__setattr__(attr, other_model.__getattribute__(attr)) devlogger.debug(f'Updating {self} with {g.children[0].children[0]}') self.update(g, copy=copy) self._clear_types() ## Combine configurations self.configuration = other_model.configuration.combine(self.configuration, policy='best', warn=True)
[docs] def assign_IBI_degrees_of_freedom(self): """ Convenience routine that adds degrees of freedom to corresponding IBI potentials """ from .ibi import BondDof, AngleDof, DihedralDof, PairDistributionDof self.bonded_ibi_potentials = set() self.nonbonded_ibi_potentials = set() for get_fn, parts_pot_fn, cls in ( (self.get_bonds, lambda x: (x[:2],x[2]), BondDof), (self.get_angles, lambda x: (x[:3],x[3]), AngleDof), (self.get_dihedrals, lambda x: (x[:4],x[4]), DihedralDof) ): for x in get_fn(): parts, pot = parts_pot_fn(x) try: pot.degrees_of_freedom except: continue pot.degrees_of_freedom.append( cls(*parts) ) if pot not in self.bonded_ibi_potentials: self.bonded_ibi_potentials.add( pot ) logger.info(f'Gathering nonbonded IBI degrees of freedom') all_exclusions = self.get_exclusions() try: raise # particle.idx wasn't set without this if len(self.type_counts) == 0: raise except: self.getParticleTypesAndCounts() type_to_particles = dict() for p in self: t = p.type_ if t not in type_to_particles: type_to_particles[t] = [] type_to_particles[t].append(p) type_to_particles[None] = [p for p in self] already_handled = set() for pot, tA, tB in self.nonbonded_interactions: ## Avoid including particle type pairs that don't apply if (tA,tB) in already_handled: continue already_handled.add((tA,tB)) already_handled.add((tB,tA)) try: pot.degrees_of_freedom except: continue def _cond(pair): b1 = (tA is None or pair[0].type_ == tA) and (tB is None or pair[1].type_ == tB) b2 = (tB is None or pair[0].type_ == tB) and (tA is None or pair[1].type_ == tA) return (b1 or b2) ex = list(filter(_cond, all_exclusions)) dof = PairDistributionDof( type_to_particles[tA], type_to_particles[tB], exclusions=ex, range_=pot.range_, resolution=pot.resolution) pot.degrees_of_freedom.append( dof ) if pot not in self.nonbonded_ibi_potentials: self.nonbonded_ibi_potentials.add( pot )
[docs] def load_target_IBI_distributions(self): raise NotImplementedError
[docs] def run_IBI(self, iterations, directory = './', engine = None, replicas = 1, run_minimization = True, first_iteration=1, target_universe = None, target_trajectory = None): """ Run Iterative Boltzmann Inversion (IBI) to optimize coarse-grained potentials. This method performs IBI iterations to match target distributions. For each iteration, it writes CG potentials, runs simulations, and extracts distributions from the resulting trajectories to update the potentials for the next iteration. Parameters ---------- iterations : int Number of IBI iterations to run directory : str, optional Directory to store output files, defaults to current directory engine : ArbdEngine, optional Simulation engine to use. If None, creates a default engine with 5e6 steps and 1e4 output period replicas : int, optional Number of replica simulations to run per iteration, defaults to 1 run_minimization : bool, optional Whether to run a brief minimization before the first iteration, defaults to True first_iteration : int, optional Starting iteration number, useful for continuing previous IBI runs, defaults to 1 target_universe : MDAnalysis.Universe, optional Universe object containing target structure and trajectory target_trajectory : str or list, optional Target trajectory file(s) to use if target_universe is provided Raises ------ ValueError If the model does not have any IBI potentials assigned NotImplementedError If the system does not have an orthorhombic unit cell Notes ----- The method requires bonded_ibi_potentials and nonbonded_ibi_potentials to be defined in the model. Use assign_IBI_degrees_of_freedom() before calling this method. """ try: assert( len(self.bonded_ibi_potentials) + len(self.nonbonded_ibi_potentials) > 0 ) except: raise ValueError('Model does not appear to contain IBI potentials; perhaps you forgot to run self.assign_IBI_degrees_of_freedom()') logger.info(f'Running {iterations} IBI iterations with {len(self.bonded_ibi_potentials)} bonded and {len(self.nonbonded_ibi_potentials)} nonbonded IBI potentials') if engine is None: engine = ArbdEngine( num_steps = 5e6, output_period = 1e4, ) if np.array(self.dimensions).size > 3: raise NotImplementedError('IBI only implemented for systems with orthorhombic unit cells') else: box = tuple(list(self.dimensions[:3]) + ([90]*3)) restart_file = None if target_universe is not None: _potentials = list(self.bonded_ibi_potentials)+list(self.nonbonded_ibi_potentials) with logging_redirect_tqdm(loggers=[logger,devlogger]): for potential in tqdm(_potentials, desc='Calculating target distributions'): potential.get_target_distribution(target_universe, trajectory=target_trajectory) def _load_cg_u(iteration): name = f'ibi-{iteration:03d}' psf = '{}/{}.psf'.format(directory,name) globstring=f'{directory}/output/{name}.*dcd' dcds = [f for f in glob(globstring) if 'momentum' not in f] if len(dcds) == 0: raise ValueError(f'Expected to find dcds at {globstring}') cg_u = mda.Universe(psf,*dcds) return cg_u cg_u = None if first_iteration > 1: cg_u = _load_cg_u(first_iteration-1) with logging_redirect_tqdm(loggers=[logger,devlogger]): for p in tqdm(_potentials, desc='Calculating initial CG distributions'): p.get_cg_distribution(cg_u, box=box, recalculate=False) for p in _potentials: p.iteration = first_iteration for i in range(first_iteration, iterations+1): logger.info(f'Working on IBI iteration {i}/{iterations}') with logging_redirect_tqdm(loggers=[logger,devlogger]): for p in tqdm(_potentials, desc='Writing CG potentials'): # if 'IBIPotentials/intrabond-1' in p.filename(): import ipdb; ipdb.set_trace() try: p.write_cg_potential(cg_u, tol=p.tol, box=box) except: p.write_cg_potential(cg_u, box=box) if i == 1 and run_minimization: logger.info(f'Running brief simulation with small timestep') ts0 = engine._get_combined_conf(self).timestep engine.simulate( self, output_name = 'ibi-min', directory = directory, timestep = ts0/100, num_steps = 10000, output_period=1000 ) restart_file = f'{directory}/output/ibi-min.restart' name = f'ibi-{i:03d}' engine.simulate( self, output_name = name, directory = directory, restart_file = restart_file, replicas = replicas ) restart_file = f'{directory}/output/{name}{".0" if replicas > 1 else ""}.restart' cg_u = _load_cg_u(i) with logging_redirect_tqdm(loggers=[logger,devlogger]): for p in tqdm(_potentials, desc='Extracting CG distributions'): p.get_cg_distribution(cg_u, box=box, recalculate=False) p.iteration += 1
[docs] def update(self, group , copy=False): assert( isinstance(group, Group) ) if copy: group = deepcopy(group) group.parent = self self.add(group)
def _get_nonbonded_interaction(self, typeA, typeB): for cp in (False,True): for s,A,B in self.nonbonded_interactions: if A is None or B is None: if A is None and B is None: return s elif A is None and typeB.is_same_type(B, consider_parents=cp): return s elif B is None and typeA.is_same_type(A, consider_parents=cp): return s elif typeA.is_same_type(A,consider_parents=False) and typeB.is_same_type(B,consider_parents=cp): return s # raise Exception("No nonbonded scheme found for %s and %s" % (typeA.name, typeB.name)) # print("WARNING: No nonbonded scheme found for %s and %s" % (typeA.name, typeB.name)) def _countParticleTypes(self): ## TODO: check for modifications to particle that require ## automatic generation of new particle type type_counts = dict() # type is key, value is 2-element list of regular particle counts and attached particles parts, self.rigid_bodies = [],[] for p in self: if p.rigid: self.rigid_bodies.append(p) else: parts.append(p) for p in parts: t = p.type_ if t in type_counts: type_counts[t][0] += 1 else: type_counts[t] = [1,0] parts = [p for rb in self if rb.rigid for p in rb.attached_particles] for p in parts: t = p.type_ if t in type_counts: type_counts[t][1] += 1 else: type_counts[t] = [0,1] if len(self.dummy_types) != 0: raise("Dummy types have been deprecated") # for t in self.dummy_types: # if t not in type_counts: # type_counts[t] = 0 for i,tA,tB in self.nonbonded_interactions: if tA is not None and tA not in type_counts: type_counts[tA] = [0,0] if tB is not None and tB not in type_counts: type_counts[tB] = [0,0] self.type_counts = type_counts rbtc = dict() rbti={} for rb in self.rigid_bodies: t = rb.type_.name if t in rbtc: rbtc[t] += 1 else: rbti[t]=rb.type_ rbtc[t] = 1 self.rigid_body_type_counts = [(k,rbtc[k]) for k in sorted(rbtc.keys())] self.rigid_body_index=rbti devlogger.debug(f'{self}: Counting types: {type_counts}') devlogger.debug(f'{self}: Counting rb types: {rbtc}') def _updateParticleOrder(self): ## Create ordered list self.particles = [p for p in self if not p.rigid] self.rigid_bodies = list(sorted([p for p in self if p.rigid], key=lambda x: x.type_)) # self.particles = sorted(particles, key=lambda p: (p.type_, p.idx)) ## Update particle indices idx = 0 for p in self.particles: p.idx = idx idx = idx+1 ## Add attached particle indices # attach particles for j,rb in enumerate(self.rigid_bodies): for p in rb.attached_particles: p.idx = idx idx = idx+1 ## TODO recurse through childrens' group_sites for g in self.group_sites: g.idx = idx idx = idx + 1 # self.initialCoords = np.array([p.initialPosition for p in self.particles])
[docs] def useNonbondedScheme(self, nbScheme, typeA=None, typeB=None): """ deprecated """ logger.warning('useNonbondedScheme is deprecated! Please update your code to use `add_nonbonded_interaction`') self.add_nonbonded_interaction(nbScheme, typeA, typeB)
[docs] def add_nonbonded_interaction(self, nonbonded_potential, typeA=None, typeB=None): self.nonbonded_interactions.append( [nonbonded_potential, typeA, typeB] ) if typeA != typeB: self.nonbonded_interactions.append( [nonbonded_potential, typeB, typeA] )
[docs] def prepare_for_simulation(self): ...
[docs] def getParticleTypesAndCounts(self): """ Includes rigid body-attached particles """ ## TODO: remove(?) if self.type_counts is None: self._countParticleTypes() self._updateParticleOrder() return sorted( self.type_counts.items(), key=lambda x: x[0] )
def _particleTypePairIter(self): typesAndCounts = self.getParticleTypesAndCounts() i_skipped = 0 for i in range(len(typesAndCounts)): t1,(n1,rb1) = typesAndCounts[i] if n1+rb1 == 0: i_skipped += 1 continue j_skipped = 0 for j in range(i,len(typesAndCounts)): t2,(n2,rb2) = typesAndCounts[j] if n2+rb2 == 0: j_skipped += 1 continue if n2 == 0: continue yield( [i-i_skipped,j-i_skipped-j_skipped,t1,t2] )
[docs] def dimensions_from_structure( self, padding_factor=1.5, isotropic=False ): raise(NotImplementedError)
[docs] def simulate(self, output_name, **kwargs): ## split kwargs sim_kws = ['output_directory', 'directory', 'log_file', 'binary', 'num_procs', 'dry_run', 'configuration', 'replicas'] sim_kwargs = {kw:kwargs[kw] for kw in sim_kws if kw in kwargs} engine_kwargs = {k:v for k,v in kwargs.items() if k not in sim_kws} engine = ArbdEngine(**engine_kwargs) return engine.simulate(self, output_name, **sim_kwargs)