Source code for dpdata.plugins.ase

from typing import TYPE_CHECKING, Optional, Type

import numpy as np

import dpdata
from dpdata.driver import Driver, Minimizer
from dpdata.format import Format

try:
    import ase.io
    from ase.calculators.calculator import PropertyNotImplementedError
    from ase.io import Trajectory

    if TYPE_CHECKING:
        from ase.optimize.optimize import Optimizer
except ImportError:
    pass


[docs] @Format.register("ase/structure") class ASEStructureFormat(Format): """Format for the `Atomic Simulation Environment <https://wiki.fysik.dtu.dk/ase/>`_ (ase). ASE supports parsing a few dozen of data formats. As described in i `the documentation <ihttps://wiki.fysik.dtu.dk/ase/ase/io/io.html>`_, many of these formats can be determined automatically. Use the `ase_fmt` keyword argument to supply the format if automatic detection fails. """
[docs] def from_system(self, atoms: "ase.Atoms", **kwargs) -> dict: """Convert ase.Atoms to a System. Parameters ---------- atoms : ase.Atoms an ASE Atoms, containing a structure **kwargs : dict other parameters Returns ------- dict data dict """ symbols = atoms.get_chemical_symbols() atom_names = list(dict.fromkeys(symbols)) atom_numbs = [symbols.count(symbol) for symbol in atom_names] atom_types = np.array([atom_names.index(symbol) for symbol in symbols]).astype( int ) cells = atoms.cell[:] coords = atoms.get_positions() info_dict = { "atom_names": atom_names, "atom_numbs": atom_numbs, "atom_types": atom_types, "cells": np.array([cells]), "coords": np.array([coords]), "orig": np.zeros(3), "nopbc": not np.any(atoms.get_pbc()), } return info_dict
[docs] def from_labeled_system(self, atoms: "ase.Atoms", **kwargs) -> dict: """Convert ase.Atoms to a LabeledSystem. Energies and forces are calculated by the calculator. Parameters ---------- atoms : ase.Atoms an ASE Atoms, containing a structure **kwargs : dict other parameters Returns ------- dict data dict Raises ------ RuntimeError ASE will raise RuntimeError if the atoms does not have a calculator """ info_dict = self.from_system(atoms) try: energies = atoms.get_potential_energy(force_consistent=True) except PropertyNotImplementedError: energies = atoms.get_potential_energy() forces = atoms.get_forces() info_dict = { **info_dict, "energies": np.array([energies]), "forces": np.array([forces]), } try: stress = atoms.get_stress(False) except PropertyNotImplementedError: pass else: virials = np.array([-atoms.get_volume() * stress]) info_dict["virials"] = virials return info_dict
[docs] def from_multi_systems( self, file_name: str, begin: Optional[int] = None, end: Optional[int] = None, step: Optional[int] = None, ase_fmt: Optional[str] = None, **kwargs, ) -> "ase.Atoms": """Convert a ASE supported file to ASE Atoms. It will finally be converted to MultiSystems. Parameters ---------- file_name : str path to file begin : int, optional begin frame index end : int, optional end frame index step : int, optional frame index step ase_fmt : str, optional ASE format. See the ASE documentation about supported formats **kwargs : dict other parameters Yields ------ ase.Atoms ASE atoms in the file """ frames = ase.io.read(file_name, format=ase_fmt, index=slice(begin, end, step)) yield from frames
[docs] def to_system(self, data, **kwargs): """Convert System to ASE Atom obj.""" from ase import Atoms structures = [] species = [data["atom_names"][tt] for tt in data["atom_types"]] for ii in range(data["coords"].shape[0]): structure = Atoms( symbols=species, positions=data["coords"][ii], pbc=not data.get("nopbc", False), cell=data["cells"][ii], ) structures.append(structure) return structures
[docs] def to_labeled_system(self, data, *args, **kwargs): """Convert System to ASE Atoms object.""" from ase import Atoms from ase.calculators.singlepoint import SinglePointCalculator structures = [] species = [data["atom_names"][tt] for tt in data["atom_types"]] for ii in range(data["coords"].shape[0]): structure = Atoms( symbols=species, positions=data["coords"][ii], pbc=not data.get("nopbc", False), cell=data["cells"][ii], ) results = {"energy": data["energies"][ii], "forces": data["forces"][ii]} if "virials" in data: # convert to GPa as this is ase convention # v_pref = 1 * 1e4 / 1.602176621e6 vol = structure.get_volume() # results['stress'] = data["virials"][ii] / (v_pref * vol) results["stress"] = -data["virials"][ii] / vol structure.calc = SinglePointCalculator(structure, **results) structures.append(structure) return structures
[docs] @Format.register("ase/traj") class ASETrajFormat(Format): """Format for the ASE's trajectory format <https://wiki.fysik.dtu.dk/ase/ase/io/trajectory.html#module-ase.io.trajectory>`_ (ase).' a `traj' contains a sequence of frames, each of which is an `Atoms' object. """
[docs] def from_system( self, file_name: str, begin: Optional[int] = 0, end: Optional[int] = None, step: Optional[int] = 1, **kwargs, ) -> dict: """Read ASE's trajectory file to `System` of multiple frames. Parameters ---------- file_name : str ASE's trajectory file begin : int, optional begin frame index end : int, optional end frame index step : int, optional frame index step **kwargs : dict other parameters Returns ------- dict_frames: dict a dictionary containing data of multiple frames """ traj = Trajectory(file_name) sub_traj = traj[begin:end:step] dict_frames = ASEStructureFormat().from_system(sub_traj[0]) for atoms in sub_traj[1:]: tmp = ASEStructureFormat().from_system(atoms) dict_frames["cells"] = np.append(dict_frames["cells"], tmp["cells"][0]) dict_frames["coords"] = np.append(dict_frames["coords"], tmp["coords"][0]) ## Correct the shape of numpy arrays dict_frames["cells"] = dict_frames["cells"].reshape(-1, 3, 3) dict_frames["coords"] = dict_frames["coords"].reshape(len(sub_traj), -1, 3) return dict_frames
[docs] def from_labeled_system( self, file_name: str, begin: Optional[int] = 0, end: Optional[int] = None, step: Optional[int] = 1, **kwargs, ) -> dict: """Read ASE's trajectory file to `System` of multiple frames. Parameters ---------- file_name : str ASE's trajectory file begin : int, optional begin frame index end : int, optional end frame index step : int, optional frame index step **kwargs : dict other parameters Returns ------- dict_frames: dict a dictionary containing data of multiple frames """ traj = Trajectory(file_name) sub_traj = traj[begin:end:step] ## check if the first frame has a calculator if sub_traj[0].calc is None: raise ValueError( "The input trajectory does not contain energies and forces, may not be a labeled system." ) dict_frames = ASEStructureFormat().from_labeled_system(sub_traj[0]) for atoms in sub_traj[1:]: tmp = ASEStructureFormat().from_labeled_system(atoms) dict_frames["cells"] = np.append(dict_frames["cells"], tmp["cells"][0]) dict_frames["coords"] = np.append(dict_frames["coords"], tmp["coords"][0]) dict_frames["energies"] = np.append( dict_frames["energies"], tmp["energies"][0] ) dict_frames["forces"] = np.append(dict_frames["forces"], tmp["forces"][0]) if "virials" in tmp.keys() and "virials" in dict_frames.keys(): dict_frames["virials"] = np.append( dict_frames["virials"], tmp["virials"][0] ) ## Correct the shape of numpy arrays dict_frames["cells"] = dict_frames["cells"].reshape(-1, 3, 3) dict_frames["coords"] = dict_frames["coords"].reshape(len(sub_traj), -1, 3) dict_frames["forces"] = dict_frames["forces"].reshape(len(sub_traj), -1, 3) if "virials" in dict_frames.keys(): dict_frames["virials"] = dict_frames["virials"].reshape(-1, 3, 3) return dict_frames
[docs] @Driver.register("ase") class ASEDriver(Driver): """ASE Driver. Parameters ---------- calculator : ase.calculators.calculator.Calculato ASE calculator """ def __init__(self, calculator: "ase.calculators.calculator.Calculator") -> None: """Setup the driver.""" self.calculator = calculator
[docs] def label(self, data: dict) -> dict: """Label a system data. Returns new data with energy, forces, and virials. Parameters ---------- data : dict data with coordinates and atom types Returns ------- dict labeled data with energies and forces """ # convert data to ase data system = dpdata.System(data=data) # list[Atoms] structures = system.to_ase_structure() labeled_system = dpdata.LabeledSystem() for atoms in structures: atoms.calc = self.calculator ls = dpdata.LabeledSystem( atoms, fmt="ase/structure", type_map=data["atom_names"] ) labeled_system.append(ls) return labeled_system.data
[docs] @Minimizer.register("ase") class ASEMinimizer(Minimizer): """ASE minimizer. Parameters ---------- driver : Driver dpdata driver optimizer : type, optional ase optimizer class fmax : float, optional, default=5e-3 force convergence criterion max_steps : int, optional max steps to optimize optimizer_kwargs : dict, optional other parameters for optimizer """ def __init__( self, driver: Driver, optimizer: Optional[Type["Optimizer"]] = None, fmax: float = 5e-3, max_steps: Optional[int] = None, optimizer_kwargs: dict = {}, ) -> None: self.calculator = driver.ase_calculator if optimizer is None: from ase.optimize import LBFGS self.optimizer = LBFGS else: self.optimizer = optimizer self.optimizer_kwargs = { "logfile": None, **optimizer_kwargs.copy(), } self.fmax = fmax self.max_steps = max_steps
[docs] def minimize(self, data: dict) -> dict: """Minimize the geometry. Parameters ---------- data : dict data with coordinates and atom types Returns ------- dict labeled data with minimized coordinates, energies, and forces """ system = dpdata.System(data=data) # list[Atoms] structures = system.to_ase_structure() labeled_system = dpdata.LabeledSystem() for atoms in structures: atoms.calc = self.calculator dyn = self.optimizer(atoms, **self.optimizer_kwargs) dyn.run(fmax=self.fmax, steps=self.max_steps) ls = dpdata.LabeledSystem( atoms, fmt="ase/structure", type_map=data["atom_names"] ) labeled_system.append(ls) return labeled_system.data