## Import packages
from pathlib import Path
import numpy as np
import os, sys, subprocess
import platform
from abc import abstractmethod, ABCMeta
import shutil
from .core_objects import GroupSite
from .logger import devlogger, logger, get_resource_path
from .interactions import NullPotential
from .sim_config import SimConf
""" SimEngines
Abstract classes for running simulations with different engines.
SimEngine is the base class for all simulation engines.
"""
[docs]
class SimEngine(metaclass=ABCMeta):
""" Abstract class for running a simulation of a model """
def __init__(self, configuration=None):
self.configuration = configuration
@property
@abstractmethod
def default_binary(self):
...
@abstractmethod
def _generate_command_string(self, binary, output_name, output_directory, gpu=0, replicas=1):
...
[docs]
@abstractmethod
def write_simulation_files(self, model, output_name):
...
[docs]
@abstractmethod
def get_default_conf(self):
...
[docs]
def simulate(self, model, output_name, output_directory='output',
directory='.', log_file=None,
binary=None, num_procs=None, dry_run = False, configuration = None, replicas = 1, **conf_params):
## TODO: Allow _get_combined_conf to take certain parameters as arguments, or otherwise refactor to make this more elegant
gpu = self._get_combined_conf(model, **conf_params).gpu
assert(type(gpu) is int)
if num_procs is None:
import multiprocessing
num_procs = max(1,multiprocessing.cpu_count()-1)
d_orig = os.getcwd()
try:
model._d_orig = d_orig
if not os.path.exists(directory):
os.makedirs(directory)
os.chdir(directory)
model.prepare_for_simulation()
if output_directory == '': output_directory='.'
if dry_run:
if binary is None: binary=self.default_binary
else:
binary = self._get_binary(binary)
if not os.path.exists(output_directory):
os.makedirs(output_directory)
elif not os.path.isdir(output_directory):
raise Exception("output_directory '%s' is not a directory!" % output_directory)
self.write_simulation_files(model, output_name, configuration, **conf_params)
## http://stackoverflow.com/questions/18421757/live-output-from-subprocess-command
cmd = self._generate_command_string(binary, output_name, output_directory, num_procs, gpu, replicas)
if dry_run:
logger.info(f'Run with: {" ".join(cmd)}')
else:
logger.info(f'Running {self.default_binary} with: {" ".join(cmd)}')
if log_file is None or (hasattr(log_file,'write') and callable(log_file.write)):
fd = sys.stdout if log_file is None else log_file
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False) # , universal_newlines=True)
## re-open to get \r recognized as new line
with open(os.dup(process.stdout.fileno()), newline='') as nice_stdout:
for line in nice_stdout:
try:
fd.write(line)
except:
print("WARNING: could not encode line; your locale might not be set correctly")
fd.flush()
else:
with open(log_file,'w') as fd:
process = subprocess.Popen(cmd, stdout=log_file, universal_newlines=True)
process.communicate()
except:
raise
finally:
del model._d_orig
os.chdir(d_orig)
def _get_binary(self, binary=None):
if binary is None:
from .binary_manager import BinaryManager
binary_name = self.default_binary
binary = BinaryManager.get_binary_path(binary_name)
if binary is None:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
fname = os.path.join(path, self.default_binary)
if os.path.isfile(fname) and os.access(fname, os.X_OK):
binary = fname
break
if binary is None: raise Exception("{} was not found".format(self.default_binary))
if not os.path.exists(binary):
raise Exception("{} was not found".format(self.default_binary))
if not os.path.isfile(binary):
raise Exception("{} was not found".format(self.default_binary))
if not os.access(binary, os.X_OK):
raise Exception("{} is not executable".format(self.default_binary))
return binary
def _get_combined_conf(self, model, **conf_params):
conf = self.get_default_conf()
conf = conf.combine(model.configuration)
conf = conf.combine(self.configuration)
conf = conf.combine(SimConf(**conf_params))
return conf
""" TODO: Remove
# def _configuration_as_dict(self, model):
# params = dict()
# for o in (self.configuration, self, model):
# for k in _get_properties_and_dict_keys(o):
# params[k] = o.__get_attribute__(k)
# return params
"""
[docs]
class ArbdEngine(SimEngine):
"""
Interface to ARBD simulation engine.
The ArbdEngine class provides an interface to the ARBD (Atomically-Resolved Brownian Dynamics)
simulation engine. It handles the generation of simulation configuration files, particle coordinates,
and various potential files required for ARBD simulations.
Attributes:
extra_bd_file_lines (str): Additional lines to be added to the BD configuration file.
configuration (SimConf): The simulation configuration object.
num_particles (int): Number of particles in the system.
particles (list): List of particles in the system.
type_counts (dict): Count of each particle type.
_written_bond_files (dict): Dictionary tracking written bond files.
cacheUpToDate (bool): Flag indicating if the cache is current.
potential_directory (str): Directory for storing potential files.
rb_type_dirs (dict): Directories for rigid body types.
"""
def __init__(self, extra_bd_file_lines="", configuration=None, **conf_params):
self.extra_bd_file_lines = extra_bd_file_lines
if configuration is None:
configuration = SimConf(**conf_params)
SimEngine.__init__(self, configuration)
self.num_particles = 0
self.particles = [] # TODO decide if this should belong here, or with model
self.type_counts = None # TODO decide if this should belong here, or with model
self._written_bond_files = dict()
self.cacheUpToDate = False
@property
def default_binary(self):
return 'arbd'
def _generate_command_string(self, binary, output_name, output_directory, num_procs=None, gpu=0, replicas=1):
cmd = [binary, '-g', "%d" % gpu]
if replicas > 1:
cmd = cmd + ['-r',replicas]
cmd = cmd + ["%s.bd" % output_name, "%s/%s" % (output_directory, output_name)]
cmd = tuple(str(x) for x in cmd)
return cmd
[docs]
def get_default_conf(self):
conf = SimConf(num_steps=1e5, output_period=1e3,
integrator='MD', timestep=20e-6, thermostat='Langevin', barostat=None,
temperature=295, pressure=1,
cutoff=50, pairlist_distance=None, decomp_period=40, gpu = 0,
seed=None, restart_file=None)
return conf
# -------------------------- #
# Methods for printing model #
# -------------------------- #
[docs]
def write_simulation_files(self, model, output_name, configuration=None, **conf_params):
"""
Write simulation files for a model.
This method generates all necessary files for simulation, including system
topology files (PSF, PDB), particle coordinates, and various potential files.
Parameters
----------
model : object
The model object containing all necessary information about the system.
output_name : str
Base name for output files.
configuration : dict, optional
Configuration dictionary for the model. If None, a configuration will be
generated using _get_combined_conf.
**conf_params : dict
Additional configuration parameters to pass to _get_combined_conf if
configuration is None.
Note
-----
The method creates directory structure for potentials and rigid body types
if they don't exist. It generates the following files:
- PSF and PDB topology files
- Particle configuration file
- Various potential files (restraint, bond, angle, dihedral, etc.)
- Rigid body coordinate and configuration files
- Creates directories if they don't exist
- Sets self.potential_directory and self.rb_type_dirs attributes
- Writes multiple files to disk
"""
if configuration is None:
configuration = self._get_combined_conf(model, **conf_params)
## TODO: save and reference directories and prefixes using member data
d = self.potential_directory = "potentials"
if not os.path.exists(d):
os.makedirs(d)
rb_type_dirs = {}
for rbk, num in model.rigid_body_type_counts:
if num > 0:
rbt = model.rigid_body_index[rbk]
rb_dir = rbt.name # Create top-level directory named after the RigidBodyType
if not os.path.exists(rb_dir):
os.makedirs(rb_dir)
rb_type_dirs[rbt.name] = rb_dir
main_potentials_dir=self.potential_directory
self.rb_type_dirs = rb_type_dirs
model.write_psf( output_name+'.psf' )
model.write_pdb( output_name+'.pdb' )
self._write_particle_file(model, output_name + ".particles.txt", configuration)
self._write_restraint_file(model, f"{main_potentials_dir}/{output_name}.restraint.txt")
self._write_bond_file(model, f"{main_potentials_dir}/{output_name}.bond.txt")
self._write_angle_file(model, f"{main_potentials_dir}/{output_name}.angle.txt")
self._write_dihedral_file(model, f"{main_potentials_dir}/{output_name}.dihedral.txt")
self._write_vector_angle_file(model, f"{main_potentials_dir}/{output_name}.vecangle.txt")
self._write_exclusion_file(model, f"{main_potentials_dir}/{output_name}.exclusion.txt")
self._write_bond_angle_file(model, f"{main_potentials_dir}/{output_name}.bond-angle.txt")
self._write_product_potential_file(model, f"{main_potentials_dir}/{output_name}.product_potential.txt")
self._write_group_sites_file(model, f"{main_potentials_dir}/{output_name}.group_sites.txt")
self._write_rb_coordinate_file(model, configuration, f'{output_name}.rbcoords.txt')
self._write_potential_files(model, output_name, directory=main_potentials_dir, configuration=configuration)
self._write_rb_attached_particles_files(model, output_name, configuration)
self._write_conf(model, output_name, configuration)
## , numSteps=numSteps, outputPeriod=outputPeriod, restart_file=restart_file )
def _write_particle_file(self, model, filename, configuration=None, **conf_params):
if configuration is None:
configuration = self._get_combined_conf(model, **conf_params)
with open(filename,'w') as fh:
if configuration.integrator in ('Brown', 'Brownian', 'BD'):
for p in model.particles:
data = tuple([p.idx,p.type_.name] + [x for x in p.get_collapsed_position()])
fh.write("ATOM %d %s %f %f %f\n" % data)
else:
for p in model.particles:
data = [p.idx,p.type_.name] + [x for x in p.get_collapsed_position()]
try:
data = data + p.momentum
except:
try:
data = data + p.velocity*p.mass
except:
data = data + [0,0,0]
fh.write("ATOM %d %s %f %f %f %f %f %f\n" % tuple(data))
def _write_rb_attached_particles_files(self, model, output_name, configuration=None, **conf_params):
if configuration is None:
configuration = self._get_combined_conf(model, **conf_params)
if len(model.rigid_bodies) > 0:
for rbk, num in model.rigid_body_type_counts:
rbt = model.rigid_body_index[rbk]
devlogger.debug(f'Writing attached particles file for rigid body type {rbt}')
if num > 0 and len(rbt.attached_particles) > 0:
# Use RB-specific directory
rb_dir = self.rb_type_dirs.get(rbt.name)
if not rb_dir:
logger.warning(f"No directory found for RigidBodyType {rbt.name}, using default")
rb_dir = self.potential_directory
# Create the attached particles file inside the RB directory
f = os.path.join(rb_dir, f"attached_particles.txt")
rbt._attached_particles_filename = f
with open(f, 'w') as fh:
for p in rbt.attached_particles:
x, y, z = p.position
fh.write(f'{p.type_.name} {x} {y} {z}\n')
def _write_rigid_group_file(self, model, filename, groups):
raise Exception('Deprecated')
with open(filename,'w') as fh:
for g in groups:
fh.write("#Group\n")
try:
if len(g.trans_damping) != 3: raise
fh.write(" ".join(str(v) for v in g.trans_damping) + " ")
except:
raise ValueError("Group {} lacks 3-value 'trans_damping' attribute")
try:
if len(g.rot_damping) != 3: raise
fh.write(" ".join(str(v) for v in g.rot_damping) + " ")
except:
raise ValueError("Group {} lacks 3-value 'rot_damping' attribute")
fh.write("{}\n".format(len(g)))
particles = [p for p in g]
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
for c in chunks(particles,8):
fh.write(" ".join(str(p.idx) for p in c) + "\n")
def _write_potential_files(self, model, prefix, directory = "potentials", configuration=None, **conf_params):
try:
os.makedirs(directory)
except OSError:
if not os.path.isdir(directory):
raise
if configuration is None:
configuration = self._get_combined_conf(model, **conf_params)
path_prefix = "{}/{}".format(directory,prefix)
self._write_nonbonded_parameter_files( model, path_prefix + "-nb", configuration )
def _write_nonbonded_parameter_files(self, model, prefix, configuration=None, **conf_params):
if configuration is None:
configuration = self._get_combined_conf(model, **conf_params)
model._nonbonded_interaction_files = [] # clear old nb files
x = np.arange(0, configuration.cutoff)
for i,j,t1,t2 in model._particleTypePairIter():
interaction = model._get_nonbonded_interaction(t1,t2)
if interaction is None: interaction = NullPotential()
try:
f = interaction.filename(types=(t1,t2))
except:
f = "%s.%s-%s.dat" % (prefix, t1.name, t2.name)
logger.debug(f'_write_nonbonded_parameter_files could not find filename for {interaction}; using default {f}')
devlogger.debug(f'_write_nonbonded_parameter_files: {i}, {j}, {t1}, {t2}, {interaction}')
old_range = interaction.range_
if not isinstance(interaction,NullPotential):
interaction.range_ = [0, configuration.cutoff]
interaction.write_file(f, (t1, t2))
interaction.range_ = old_range
model._nonbonded_interaction_files.append(f)
devlogger.debug(f'model._nonbonded_interaction_files: {model._nonbonded_interaction_files}')
def _write_restraint_file( self, model, filename ):
self._restraint_filename = filename
with open(self._restraint_filename,'w') as fh:
for i,restraint in model.get_restraints():
item = [i.idx]
if len(restraint) == 1:
item.append(restraint[0])
if isinstance(i, GroupSite):
item.extend(i.get_center())
else:
item.extend(i.get_collapsed_position())
elif len(restraint) == 2:
item.append(restraint[0])
item.extend(restraint[1])
elif len(restraint) == 5:
item.extend(restraint)
fh.write("RESTRAINT %d %f %f %f %f\n" % tuple(item))
def _write_bond_file( self, model, filename ):
self._bond_filename = filename
for b in list( set( [b for i,j,b,ex in model.get_bonds()] ) ):
if type(b) is not str and not isinstance(b, Path):
b.write_file()
with open(self._bond_filename,'w') as fh:
for i,j,b,ex in model.get_bonds():
try:
bfile = b.filename()
except:
bfile = str(b)
item = (i.idx, j.idx, bfile)
if ex:
fh.write("BOND REPLACE %d %d %s\n" % item)
else:
fh.write("BOND ADD %d %d %s\n" % item)
def _write_angle_file( self, model, filename ):
self._angle_filename = filename
for b in list( set( [b for i,j,k,b in model.get_angles()] ) ):
if type(b) is not str and not isinstance(b, Path):
b.write_file()
with open(self._angle_filename,'w') as fh:
for b in model.get_angles():
try:
bfile = b[-1].filename()
except:
bfile = str(b[-1])
item = tuple([p.idx for p in b[:-1]] + [bfile])
fh.write("ANGLE %d %d %d %s\n" % item)
def _write_dihedral_file( self, model, filename ):
self._dihedral_filename = filename
for b in list( set( [b for i,j,k,l,b in model.get_dihedrals()] ) ):
if type(b) is not str and not isinstance(b, Path):
b.write_file()
with open(self._dihedral_filename,'w') as fh:
for b in model.get_dihedrals():
try:
bfile = b[-1].filename()
except:
bfile = str(b[-1])
item = tuple([p.idx for p in b[:-1]] + [bfile])
fh.write("DIHEDRAL %d %d %d %d %s\n" % item)
def _write_vector_angle_file( self, model, filename ):
self._vector_angle_filename = filename
for b in list( set( [b for i,j,k,l,b in model.get_vector_angles()] ) ):
if type(b) is not str and not isinstance(b, Path):
b.write_file()
if len(model.vector_angles) > 0:
with open(self._vector_angle_filename,'w') as fh:
for b in model.get_vector_angles():
p = b[-1]
try:
bfile = p.filename()
except:
bfile = str(p)
item = tuple([p.idx for p in b[:-1]] + [bfile])
fh.write("VECANGLE %d %d %d %d %s\n" % item)
def _write_exclusion_file( self, model, filename ):
self._exclusion_filename = filename
with open(self._exclusion_filename,'w') as fh:
for ex in model.get_exclusions():
item = tuple(int(p.idx) for p in ex)
fh.write("EXCLUDE %d %d\n" % item)
def _write_bond_angle_file( self, model, filename ):
self._bond_angle_filename = filename
if len(model.bond_angles) > 0:
with open(self._bond_angle_filename,'w') as fh:
for b in model.get_bond_angles():
bfiles = []
for p in b[-1]:
try:
bfile = p.filename()
except:
bfile = str(p)
bfiles.append(bfile)
item = tuple([p.idx for p in b[:-1]] + bfiles)
fh.write("BONDANGLE %d %d %d %d %s %s %s\n" % item)
def _write_product_potential_file( self, model, filename ):
self._product_potential_filename = filename
if len(model.product_potentials) > 0:
with open(self._product_potential_filename,'w') as fh:
for pot in model.get_product_potentials():
line = "PRODUCTPOTENTIAL "
for ijk_tb in pot:
ijk = ijk_tb[:-1]
tb = ijk_tb[-1]
if type(tb) is tuple or type(tb) is list:
if len(tb) != 2: raise ValueError("Invalid product potential")
type_,b = tb
if type(type_) is not str: raise ValueError("Invalid product potential: unrecognized specification of potential type")
else:
type_ = ""
b = tb
if type(b) is not str and not isinstance(b, Path):
b.write_file()
try:
bfile = b.filename()
except:
bfile = str(b)
line = line+" ".join([str(x.idx) for x in ijk])+" "
line = line+" ".join([str(x) for x in [type_,bfile] if x != ""])+" "
fh.write(line)
def _write_group_sites_file( self, model, filename ):
self._group_sites_filename = filename
if len(model.group_sites) > 0:
with open(self._group_sites_filename,'w') as fh:
for i,g in enumerate(model.group_sites):
assert( i+len(model.particles) == g.idx )
ids = " ".join([str(int(p.idx)) for p in g.particles])
fh.write("GROUP %s\n" % ids)
def _write_rb_coordinate_file( self, model, configuration, filename ):
self._rb_coordinate_filename = filename
if len(model.rigid_bodies) > 0:
rb_integrator = configuration.rigid_body_integrator
if rb_integrator is None: rb_integrator = configuration.integrator
with open(self._rb_coordinate_filename,'w') as fh:
for rb in model.rigid_bodies:
o = list(rb.applyOrientation(rb.orientation).flatten())
if len(o) != 9: raise ValueError('Rigid body orientation should be a 3x3 matrix')
fh.write(' '.join(map(str,list(rb.get_collapsed_position()) + o)))
if rb_integrator in ('MD','Langevin'):
try: fh.write( ' ' + ' '.join([str(x) for x in rb.momentum]) )
except: fh.write( ' 0'*3 )
try: fh.write( ' ' + ' '.join([str(x) for x in rb.rotational_momentum]) )
except: fh.write( ' 0'*3 )
fh.write('\n')
def _write_conf(self, model, prefix, configuration):
# num_steps=100000000, output_period=10000, restart_file=None,
## TODO: raise exception if _write_potential_files has not yet been called
filename = f'{prefix}.bd'
## Create helper function
def _fix_path(filename, rb_type=None):
if rb_type and rb_type in self.rb_type_dirs:
# First, check if this is an existing file within the RB directory
rb_dir = self.rb_type_dirs[rb_type]
basename = os.path.basename(str(filename))
rb_path = os.path.join(rb_dir, basename)
if os.path.exists(rb_path):
return rb_path
# Check if this might be a resource we want to place in the RB directory
if basename.startswith(rb_type) or rb_type in str(filename):
return rb_path
abspath = str(filename) if str(filename)[0] == '/' else Path(model._d_orig) / filename
ret = None
try:
ret = os.path.relpath(str(abspath))
except:
devlogger.info(f'Relative path for {filename} not found... using {abspath}')
ret = abspath
return str(ret)
## Build dictionary of parameters
params = dict()
for k,v in configuration.items():
params[k] = v
if configuration.seed is None:
params['seed'] = f'seed {int(np.random.default_rng().integers(1,99999,1))}'
else:
params['seed'] = "seed {:d}".format(configuration.seed)
params['num_steps'] = int(configuration.num_steps)
# params['coordinateFile'] = "%s.coord.txt" % prefix
params['particle_file'] = "%s.particles.txt" % prefix
if configuration.restart_file is None:
params['restart_coordinates'] = ""
else:
params['restart_coordinates'] = "restartCoordinates %s" % configuration.restart_file
for k,v in zip('XYZ', model.dimensions):
params['dim'+k] = v
if model.origin is None:
for k,v in zip('XYZ', model.dimensions):
params['origin'+k] = -v*0.5
else:
for k,v in zip('XYZ', model.origin):
params['origin'+k] = v
if params['pairlist_distance'] is None:
params['pairlist_distance'] = 10
else:
params['pairlist_distance'] -= params['cutoff']
if params['integrator'] == 'MD':
params['integrator'] = 'Langevin'
if len(model.rigid_bodies) > 0:
rb_integrator = params['rigid_body_integrator']
if rb_integrator is None: rb_integrator = params["integrator"]
params['rigid_body_integrator'] = f'\nRigidBodyDynamicType {rb_integrator}'
_rbggp = params["rigid_body_grid_grid_period"]
if _rbggp is not None and _rbggp > 1:
params['rigid_body_integrator'] += f'\nrigidBodyGridGridPeriod {_rbggp}'
else:
params['rigid_body_integrator'] = ''
## Actually write the file
with open(filename,'w') as fh:
fh.write("""{seed}
timestep {timestep}
steps {num_steps}
numberFluct 0 # deprecated
interparticleForce 1 # other values deprecated
fullLongRange 0 # deprecated
temperature {temperature}
ParticleDynamicType {integrator}{rigid_body_integrator}
outputPeriod {output_period}
## Energy doesn't actually get printed!
outputEnergyPeriod {output_period}
outputFormat dcd
## Infrequent domain decomposition because this kernel is still very slow
decompPeriod {decomp_period}
cutoff {cutoff}
pairlistDistance {pairlist_distance}
origin {originX} {originY} {originZ}
systemSize {dimX} {dimY} {dimZ}
{extra_bd_file_lines}
\n""".format(extra_bd_file_lines=self.extra_bd_file_lines, **params))
## Write entries for each type of particle
for pt,(num,num_rigid) in model.getParticleTypesAndCounts():
if num+num_rigid == 0: continue
devlogger.debug(f'Writing configuration for particle type {pt}')
## TODO create new particle types if existing has grid
particleParams = pt.__dict__.copy()
particleParams['num'] = num
if configuration.integrator in ('Brown', 'Brownian', 'BD'):
try:
D = pt.diffusivity
except:
""" units "k K/(amu/ns)" "AA**2/ns" """
D = 831447.2 * configuration.temperature / (pt.mass * pt.damping_coefficient)
particleParams['dynamics'] = 'diffusion {D}'.format(D = D)
elif configuration.integrator in ('MD','Langevin','FusDynamic'):
try:
gamma = pt.damping_coefficient
if gamma is None: raise
except:
""" units "k K/(AA**2/ns)" "amu/ns" """
gamma = 831447.2 * configuration.temperature / (pt.mass*pt.diffusivity)
particleParams['dynamics'] = """mass {mass}
transDamping {g} {g} {g}""".format(mass=pt.mass, g=gamma)
else:
raise ValueError("Unrecognized particle integrator '{}'".format(configuration.particle_integrator))
fh.write("""
particle {name}
num {num}
{dynamics}
""".format(**particleParams))
if 'grid_potentials' in particleParams:
grids = []
scales = []
boundary_conditions = []
for vals in pt.grid_potentials:
try: g,s,bc = vals
except:
logger.warning(f'Failed to unpack {pt}.grid_potentials, presumably due to lack of specified boundary condition... using "dirichlet"')
g,s = vals
bc = 'dirichlet'
grids.append( _fix_path(g) )
scales.append(str(s))
boundary_conditions.append(bc)
fh.write("gridFile {}\n".format(" ".join(grids)))
fh.write("gridFileScale {}\n".format(" ".join(scales)))
if any([bc != 'dirichlet' for bc in boundary_conditions]):
fh.write(f'gridFileBoundaryConditions {" ".join(boundary_conditions)}'+'\n')
else:
fh.write("gridFile {}/null.dx\n".format(self.potential_directory))
if 'forceXGrid' in particleParams:
fh.write(f"forceXGridFile {_fix_path(pt.forceXGrid)}\n")
if 'forceYGrid' in particleParams:
fh.write(f"forceYGridFile {_fix_path(pt.forceYGrid)}\n")
if 'forceZGrid' in particleParams:
fh.write(f"forceZGridFile {_fix_path(pt.forceZGrid)}\n")
if any(f'force{x}Grid' in particleParams for x in ('X','Y','Z')) and \
'forceGridScale' in particleParams:
_scale = None
if isinstance( pt.forceGridScale, float ):
_scale = 3*[pt.forceGridScale]
elif len( pt.forceGridScale ) == 1:
_scale = 3*[pt.forceGridScale[0]]
elif len( pt.forceGridScale ) == 3:
_scale = pt.forceGridScale
else:
raise ValueError(f'Unrecognized format for ParticleType.forceGridScale: "{pt.forceGridScale}"')
fh.write(f"forceGridScale {' '.join(map(str,_scale))}")
if 'rigid_body_potentials' in particleParams:
grids = []
scales = []
for item in pt.rigid_body_potentials:
try: keyword,s = item
except: keyword,s = (item,1)
fh.write(f"rigidBodyPotential {keyword}\n")
if s != 1: raise NotImplementedError('Instead scale rigid body potential')
## Write coordinates and interactions
fh.write("""
## Input coordinates
inputParticles {particle_file}
{restart_coordinates}
## Interaction potentials
tabulatedPotential 1
## The i@j@file syntax means particle type i will have NB interactions with particle type j using the potential in file
""".format(**params))
for pair,f in zip(model._particleTypePairIter(), model._nonbonded_interaction_files):
if f is not None:
i,j,t1,t2 = pair
fh.write("tabulatedFile %d@%d@%s\n" % (i,j,f))
## Bonded interactions
restraints = model.get_restraints()
bonds = model.get_bonds()
angles = model.get_angles()
dihedrals = model.get_dihedrals()
vector_angles = model.get_vector_angles()
exclusions = model.get_exclusions()
bond_angles = model.get_bond_angles()
prod_pots = model.get_product_potentials()
# group_sites = model.get_group_sites()
group_sites = model.group_sites
if len(bonds) > 0:
for b in model._get_bond_potentials():
try:
bfile = b.filename()
except:
bfile = str(b)
fh.write("tabulatedBondFile %s\n" % bfile)
if len(angles) > 0:
for b in model._get_angle_potentials():
try:
bfile = b.filename()
except:
bfile = str(b)
fh.write("tabulatedAngleFile %s\n" % bfile)
if len(vector_angles) > 0:
for b in list(set([b for i,j,k,l,b in vector_angles])):
try:
bfile = b.filename()
except:
bfile = str(b)
fh.write("tabulatedVecangleFile %s\n" % bfile)
if len(dihedrals) > 0:
for b in list(set([b for i,j,k,l,b in dihedrals])):
try:
bfile = b.filename()
except:
bfile = str(b)
fh.write("tabulatedDihedralFile %s\n" % bfile)
if len(vector_angles) > 0:
for b in list(set([b for i,j,k,l,b in vector_angles])):
try:
bfile = b.filename()
except:
bfile = str(b)
fh.write("tabulatedVecangleFile %s\n" % bfile)
if len(restraints) > 0:
fh.write("inputRestraints %s\n" % self._restraint_filename)
if len(bonds) > 0:
fh.write("inputBonds %s\n" % self._bond_filename)
if len(angles) > 0:
fh.write("inputAngles %s\n" % self._angle_filename)
if len(dihedrals) > 0:
fh.write("inputDihedrals %s\n" % self._dihedral_filename)
if len(vector_angles) > 0:
fh.write("inputVecangles %s\n" % self._vector_angle_filename)
if len(exclusions) > 0:
fh.write("inputExcludes %s\n" % self._exclusion_filename)
if len(vector_angles) > 0:
fh.write("inputVecangles %s\n" % self._vector_angle_filename)
if len(bond_angles) > 0:
fh.write("inputBondAngles %s\n" % self._bond_angle_filename)
if len(prod_pots) > 0:
fh.write("inputProductPotentials %s\n" % self._product_potential_filename)
if len(group_sites) > 0:
fh.write("inputGroups %s\n" % self._group_sites_filename)
if len(model.rigid_bodies) > 0:
for rbi,num in model.rigid_body_type_counts:
rbt=model.rigid_body_index[rbi]
if num == 0: continue
devlogger.debug(f'Writing configuration for rigid body type {rbt}')
## For now, we always convert rigid body diffusivity into mass+damping
try:
gamma = rbt.damping_coefficient
except:
""" units "k K/(AA**2/ns)" "dalton/ns" """
gamma = 831447.2 * configuration.temperature / (rbt.mass*np.array(rbt.diffusivity))
if len(gamma) == 1:
logger.warn(f'Using single diffusion coefficient for all motions along all rigid body principal axes for {rbt}')
gamma = 3*[gamma]
try:
gamma_rot = rbt.rotational_damping_coefficient
except:
""" units "k K/(1/ns)" "AA**2 dalton/ns" """
gamma_rot = 831447.2 * configuration.temperature / (np.array(rbt.moment_of_inertia)*np.array(rbt.diffusivity))
if len(gamma_rot) == 1:
logger.warn(f'Using single rotational diffusion coefficient for all motions along all rigid body principal axes for {pt}')
gamma_rot = 3*[gamma_rot]
if len(gamma_rot) != 3: raise ValueError('Expected a three-element rotational diffusion coefficient for rigid bodies')
fh.write(f"""
rigidBody {rbt.name}
num {num}
mass {rbt.mass}
inertia {' '.join(map(str,rbt.moment_of_inertia))}
transDamping {' '.join(map(str,gamma))}
rotDamping {' '.join(map(str,gamma_rot))}
""")
for item in rbt.potential_grids:
try: keyword,g,s = item
except: (keyword,g),s = (item,1)
g = _fix_path(g,rbt.name)
fh.write(f"potentialGrid {keyword} {g}\n")
if s != 1: fh.write(f"potentialGridScale {keyword} {s}\n")
for item in rbt.charge_grids:
try: keyword,g,s = item
except: (keyword,g),s = (item,1)
g = _fix_path(g,rbt.name)
fh.write(f"densityGrid {keyword} {g}\n")
if s != 1: fh.write(f"densityGridScale {keyword} {s}\n")
for item in rbt.pmf_grids:
try: keyword,g,s = item
except: (keyword,g),s = (item,1)
g = _fix_path(g,rbt.name)
fh.write(f"gridFile {keyword} {g}\n")
if s != 1: fh.write(f"pmfScale {keyword} {s}\n")
## AttachedParticles
if len(rbt.attached_particles) > 0:
fh.write(f'attachedParticles {rbt._attached_particles_filename}\n')
fh.write(f'\ninputRBCoordinates {self._rb_coordinate_filename}\n')
...
write_null_dx = False
for pt,(num,num_rb) in model.getParticleTypesAndCounts():
if num+num_rb == 0: continue
if "grid_potentials" not in pt.__dict__:
gridfile = "{}/null.dx".format(self.potential_directory)
with open(gridfile, 'w') as fh:
fh.write("""object 1 class gridpositions counts 2 2 2
origin {originX} {originY} {originZ}
delta {dimX} 0.000000 0.000000
delta 0.000000 {dimY} 0.000000
delta 0.000000 0.000000 {dimZ}
object 2 class gridconnections counts 2 2 2
object 3 class array type float rank 0 items 8 data follows
0.0 0.0 0.0
0.0 0.0 0.0
0.0 0.0
attribute "dep" string "positions"
object "density" class field
component "positions" value 1
component "connections" value 2
component "data" value 3
""".format(**params))
break
[docs]
class NamdEngine(SimEngine):
""" Partial interface to NAMD simulation engine """
def __init__(self, configuration=None, **conf_params):
if configuration is None:
configuration = SimConf(**conf_params)
SimEngine.__init__(self, configuration)
self.num_particles = 0
self.particles = [] # TODO decide if this should belong here, or with model
self.type_counts = None # TODO decide if this should belong here, or with model
self.cacheUpToDate = False
@property
def default_binary(self):
return 'namd2'
def _generate_command_string(self, binary, output_name, output_directory, num_procs=1, gpu=None, replicas=1):
cmd = [binary]
if gpu is not None and len(gpu) > 0:
cmd.extend(['+devices'] + [','.join([str(int(g)) for g in gpu])])
cmd.extend([f'+p{str(num_procs)}'])
if replicas > 1:
raise NotImplementedError
cmd = cmd + [f'{output_name}.namd']
cmd = tuple(str(x) for x in cmd)
return cmd
[docs]
def get_default_conf(self):
conf = SimConf(num_steps=1e5, output_period=1e3,
integrator='MD', timestep=2e-6, thermostat='Langevin', barostat=None,
temperature=295, pressure=1,
cutoff=12, pairlist_distance=14, decomp_period=12,
seed=None, restart_file=None)
return conf
[docs]
def write_simulation_files(self, model, output_name, configuration=None, write_pqr=False, copy_ff_from=get_resource_path("charmm36.nbfix"), **conf_params):
if configuration is None:
configuration = self._get_combined_conf(model, **conf_params)
model.write_psf( output_name+'.psf' )
model.write_pdb( output_name+'.pdb' )
model.write_pdb( output_name + ".fixed.pdb", beta_from_fixed=True )
if write_pqr: model.write_pqr( output_name + ".pqr" )
if copy_ff_from is not None and copy_ff_from != '':
try:
shutil.copytree( copy_ff_from, Path(copy_ff_from).stem )
except FileExistsError:
pass
self._write_conf( model, output_name, configuration )
[docs]
def write_conf( self,model,output_name, minimization_steps=4800, num_steps = 1e6,
output_directory = 'output',
update_dimensions=True, extrabonds=True ):
""" Write a NAMD configuration file (developed for the mrdna package) """
num_steps = int(num_steps//12)*12
minimization_steps = int(minimization_steps//24)*24
if num_steps < 12:
raise ValueError("Must run with at least 12 steps")
if minimization_steps < 24:
raise ValueError("Must run with at least 24 minimization steps")
format_data = model.__dict__.copy() # get parameters from System object
format_data['extrabonds'] = """extraBonds on
extraBondsFile $prefix.exb
""" if extrabonds else ""
if self.useTclForces:
format_data['margin'] = ""
format_data['tcl_forces'] = """tclForces on
tclForcesScript $prefix.forces.tcl
"""
else:
format_data['margin'] = """margin 30
"""
format_data['tcl_forces'] = ""
if update_dimensions:
format_data['dimensions'] = model.dimensions_from_structure()
for k,v in zip('XYZ', format_data['dimensions']):
format_data['origin'+k] = -v*0.5
format_data['cell'+k] = v
format_data['prefix'] = output_name
format_data['minimization_steps'] = int(minimization_steps//2)
format_data['num_steps'] = num_steps
format_data['output_directory'] = output_directory
filename = '{}.namd'.format(output_name)
with open(filename,'w') as fh:
fh.write("""
set prefix {prefix}
set nLast 0; # increment when continueing a simulation
set n [expr $nLast+1]
set out {output_directory}/$prefix-$n
set temperature {temperature}
structure $prefix.psf
coordinates $prefix.pdb
outputName $out
XSTfile $out.xst
DCDfile $out.dcd
#############################################################
## SIMULATION PARAMETERS ##
#############################################################
# Input
paraTypeCharmm on
parameters charmm36.nbfix/par_all36_na.prm
parameters charmm36.nbfix/par_water_ions_na.prm
wrapAll off
# Force-Field Parameters
exclude scaled1-4
1-4scaling 1.0
switching on
switchdist 8
cutoff 10
pairlistdist 12
{margin}
# Integrator Parameters
timestep 2.0 ;# 2fs/step
rigidBonds all ;# needed for 2fs steps
nonbondedFreq 1
fullElectFrequency 3
stepspercycle 12
# PME (for full-system periodic electrostatics)
PME no
PMEGridSpacing 1.2
# Constant Temperature Control
langevin on ;# do langevin dynamics
# langevinDamping 1 ;# damping coefficient (gamma); used in original study
langevinDamping 0.1 ;# less friction for faster relaxation
langevinTemp $temperature
langevinHydrogen off ;# don't couple langevin bath to hydrogens
# output
useGroupPressure yes
xstFreq 4800
outputEnergies 4800
dcdfreq 4800
restartfreq 48000
#############################################################
## EXTRA FORCES ##
#############################################################
# ENM and intrahelical extrabonds
{extrabonds}
{tcl_forces}
#############################################################
## RUN ##
#############################################################
# Continuing a job from the restart files
cellBasisVector1 {cellX} 0 0
cellBasisVector2 0 {cellY} 0
cellBasisVector3 0 0 {cellZ}
if {{$nLast == 0}} {{
temperature 300
fixedAtoms on
fixedAtomsForces on
fixedAtomsFile $prefix.fixed.pdb
fixedAtomsCol B
minimize {minimization_steps:d}
fixedAtoms off
minimize {minimization_steps:d}
}} else {{
bincoordinates {output_directory}/$prefix-$nLast.restart.coor
binvelocities {output_directory}/$prefix-$nLast.restart.vel
}}
run {num_steps:d}
""".format(**format_data))
""" External Runners
Integration with HydroPro and APBS for shape rigid body calculations.
Original script by Chun Kit Chan, 2024
HydroProRunner and APBSRunner module provides interfaces to external tools used in shape rigid body modeling:
- HydroPro for hydrodynamic calculations
- APBS for electrostatics calculations
"""
[docs]
class HydroProRunner:
"""Interface to HydroPro for hydrodynamic calculations"""
def __init__(self, mass, simconf=None, structure_name="hydrocal", cal_type="shape"):
"""Initialize HydroPro interface.
Args:
mass: Mass in AMU
simconf: SimConf object with temperature, viscosity and solvent_density (optional)
structure_name: Base name of structure files
cal_type: shape or mesh, determined by program
"""
if simconf is None:
from . import DefaultSimConf
simconf = DefaultSimConf()
self.temperature = simconf.temperature
self.viscosity = simconf.viscosity
self.solvent_density = simconf.solvent_density
self.binary = simconf.get_binary('hydropro')
# Check if binary exists
if self.binary is None:
raise FileNotFoundError("HydroPro binary not found. Please configure in simconf.")
self.binary = Path(self.binary)
if not self.binary.exists():
raise FileNotFoundError(f"HydroPro binary not found at {self.binary}")
# Make binary executable if needed (Unix only)
if platform.system() != 'Windows' and not os.access(self.binary, os.X_OK):
os.chmod(self.binary, 0o755)
self.structure_name = structure_name
self.mass = mass
self.cal_type = cal_type
[docs]
def write_config(self, output_path="hydropro.dat",
aer=2.9,nsig=6,sig_min=1,sig_max=2,specific_volume=0.702,):
"""Write HydroPro configuration file with explicit parameters.
Args:
output_path: Path to write config file
cal_type: shape(0) or mesh(1)
structure_name: Name of the molecule/structure
mass: Molecular weight in Daltons (amu)
aer: Atomic Element Radius in Angstroms
nsig: Number of values of the shell thickness
sig_min: Minimum radius of beads in the shell (Angstroms)
sig_max: Maximum radius of beads in the shell (Angstroms)
specific_volume: Partial specific volume in cm³/g
"""
temperature_c = self.temperature - 273.15 # Convert K to C
if self.cal_type=="mesh" or self.cal_type==1:
aer=10
nsig=4
sig_min=10
sig_max=20
specific_volume=1
with open(output_path, 'w') as f:
# Basic identification
f.write(f"{self.structure_name}\n") # Name of molecule
f.write(f"{self.structure_name}.hydro\n") # Base name for output files
f.write("hydro.pdb\n")
f.write("1\n") # Calculation type always 1 (bead surface model)
# Bead model parameters
f.write(f"{aer},\n") # AER (radius in Angstroms)
f.write(f"{nsig},\n") # NSIG (values of shell thickness)
f.write(f"{sig_min},\n") # SIGMIN (min bead radius)
f.write(f"{sig_max},\n") # SIGMAX (max bead radius)
# Physical parameters
f.write(f"{temperature_c},\n") # Temperature in Celsius
f.write(f"{self.viscosity},\n") # Solvent viscosity in poise
f.write(f"{self.mass},\n") # Molecular weight in Daltons
f.write(f"{specific_volume},\n") # Partial specific volume
f.write(f"{self.solvent_density}\n") # Solvent density
# Calculation control parameters
f.write("-1\n") # Number of Q values
f.write("-1\n") # Number of intervals
f.write("0\n") # Monte Carlo trials
f.write("1\n") # IDIF=1 (yes) for full diffusion tensors
f.write("*") # End marker
[docs]
def parse_output(self, output_file):
mass=self.mass
"""Parse HydroPro output file to get damping coefficients.
Args:
output_file: Path to HydroPro output file
mass: Mass in AMU used to normalize coefficients
Returns:
tuple of (translation_damping, rotation_damping)
"""
with open(output_file) as f:
lines = f.readlines()
mass=self.mass
# Skip header
line_num = 48
# Read translational coefficients
Dx = float(lines[line_num].strip().split()[0])
Dy = float(lines[line_num+1].strip().split()[1])
Dz = float(lines[line_num+2].strip().split()[2])
# Skip two lines
line_num += 5
# Read rotational coefficients
Rx = float(lines[line_num].strip().split()[3])
Ry = float(lines[line_num+1].strip().split()[4])
Rz = float(lines[line_num+2].strip().split()[5])
# Convert units
# Translation: "(295 k K) / (( cm^2/s) * amu)" "1/ns"
trans_damp = [24.527692/(x*mass) for x in [Dx, Dy, Dz]]
# Rotation: "(295 k K) / ((1 /s) * amu AA^2)" "1/ns"
rot_damp = [2.4527692e+17 / (x*mass) for x in [Rx, Ry, Rz]]
return trans_damp, rot_damp
[docs]
def run_calculation(self,work_dir="."):
"""Run HydroPro calculation.
Args:
structure_name: Base name of structure files
mass: Mass in AMU
work_dir: Working directory for calculation
Returns:
Dictionary containing:
- translation_damping: [Dx, Dy, Dz]
- rotation_damping: [Rx, Ry, Rz]
"""
structure_name, mass=self.structure_name, self.mass
original_dir = os.getcwd()
try:
os.chdir(work_dir)
# Write config
self.write_config()
# Link structure file
pdb_path = Path(f"{structure_name}.pdb")
if not pdb_path.exists():
raise FileNotFoundError(f"Structure file not found: {pdb_path}")
os.symlink(pdb_path, "hydro.pdb")
# Run HydroPro
result = subprocess.run([str(self.binary)],
capture_output=True,
text=True,
check=True)
# Parse results
trans_damp, rot_damp = self.parse_output(f"{structure_name}.hydro-res.txt")
return {
"translation_damping": trans_damp,
"rotation_damping": rot_damp
}
finally:
if os.path.exists("hydro.pdb"):
os.unlink("hydro.pdb")
os.chdir(original_dir)
[docs]
class APBSRunner:
"""Interface to APBS for electrostatics calculations"""
def __init__(self,simconf=None, binary_path=None, psize_script=None):
"""Initialize APBS interface.
Args:
binary_path: Path to APBS executable. If None, uses path from simconf
psize_script: Optional path to psize script
simconf: SimConf object (optional)
"""
# Get binary path from simconf if provided and no explicit binary_path
if binary_path is None and simconf is not None:
binary_path = simconf.get_binary('apbs')
# If still None, try binary_manager
if binary_path is None:
from .sim_config import binary_manager
binary_path = binary_manager.get_binary_path('apbs')
# If still None, use 'apbs' and rely on PATH
if binary_path is None:
binary_path = 'apbs'
self.binary = Path(binary_path)
# Check if binary exists or is in PATH
if not self.binary.exists() and not shutil.which(str(self.binary)):
raise FileNotFoundError(f"APBS binary not found at {binary_path}")
self.psize = Path(psize_script) if psize_script else None
[docs]
def write_config(self, structure_name, xyz_dims, salt_conc=0.15,
temperature=300, buffer=50, large_system='Off'):
"""Write APBS configuration file.
Args:
structure_name: Base name of structure files
xyz_dims: [x, y, z] dimensions
salt_conc: Salt concentration in M
temperature: Temperature in K
buffer: Grid buffer size in Å
large_system: 'On' or 'Off' for large system mode
"""
# Calculate grid dimensions
xyz_cg = [str(int(dim + buffer)) for dim in xyz_dims]
if large_system == 'Off':
xyz_dime = xyz_cg
center = 'mol 1'
else:
# For large systems, reduce grid density
dividend = 2
xyz_dime = [str(int((dim + buffer) / dividend)) for dim in xyz_dims]
center = 'mol 1'
config = f"""read
mol pqr {structure_name}.pqr
end
elec
mg-auto
dime {' '.join(xyz_dime)}
cglen {' '.join(xyz_cg)}
cgcent {center}
fglen {' '.join(xyz_cg)}
fgcent {center}
mol 1
npbe
bcfl sdh
srfm smol
chgm spl2
ion 1 {salt_conc} 2.0
ion -1 {salt_conc} 2.0
pdie 12.0
sdie 78.54
sdens 10.0
srad 1.4
swin 0.3
temp {temperature}
gamma 0.105
calcenergy no
calcforce no
write pot dx {structure_name}.elec.tmp
end
quit"""
with open(f"{structure_name}.apbs", 'w') as f:
f.write(config)
[docs]
def run_calculation(self, structure_name, xyz_dims, work_dir=".",
salt_conc=0.15, temperature=300):
"""Run APBS calculation.
Args:
structure_name: Base name of structure files
xyz_dims: [x, y, z] dimensions
work_dir: Working directory
salt_conc: Salt concentration in M
temperature: Temperature in K
Returns:
Path to output potential file
"""
original_dir = os.getcwd()
try:
os.chdir(work_dir)
# Write config
self.write_config(structure_name, xyz_dims, salt_conc, temperature)
# Run APBS
result = subprocess.run([str(self.binary), f"{structure_name}.apbs"],
capture_output=True,
text=True,
check=True)
output_file = Path(f"{structure_name}.elec.tmp")
if not output_file.exists():
raise RuntimeError("APBS failed to generate output file")
return output_file
finally:
os.chdir(original_dir)