Source code for mdpy.io.hdf5_writer

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
file : hdf5_writer.py
created time : 2022/02/24
author : Zhenyu Wei
copyright : (C)Copyright 2021-present, mdpy organization
"""

import h5py
import numpy as np
from mdpy.core import Topology
from mdpy.utils import check_pbc_matrix, check_quantity_value
from mdpy.unit import *
from mdpy.error import *
from mdpy.environment import *

NONE_LABEL = 10000


[docs]class HDF5Writer:
[docs] def __init__( self, file_path: str, mode: str = "w", topology: Topology = Topology(), pbc_matrix=np.diag([1] * 3).astype(NUMPY_FLOAT), ) -> None: if not file_path.endswith(".hdf5"): raise FileFormatError("The file should end with .hdf5 suffix") self._file_path = file_path self._mode = mode if not isinstance(topology, Topology): raise TypeError( "The topology attribute should be the instance of mdpy.core.Topology class" ) self._topology = topology pbc_matrix = check_quantity_value(pbc_matrix, default_length_unit) self._pbc_matrix = check_pbc_matrix(pbc_matrix) with h5py.File(self._file_path, self._mode) as f: f.create_group("topology") f.create_group("positions") f["positions/num_frames"] = 0 f["pbc_matrix"] = self._pbc_matrix self._write_topology() self._write_pbc_matrix() self._cur_frame = 0
def write(self, positions=None): shape, is_shape_error = positions.shape, False is_2d_array = True if len(shape) == 2 else False if is_2d_array and shape[0] != self._topology.num_particles: is_shape_error = True elif not is_2d_array and len(shape) != 3: is_shape_error = True elif not is_2d_array and shape[1] != self._topology.num_particles: is_shape_error = True if is_shape_error: raise ArrayDimError( "The topology contain %s particles while a positions array with shape %s is provided" % (self._topology.num_particles, list(shape)) ) with h5py.File(self._file_path, "a") as h5f: if is_2d_array: h5f["positions/frame-%s" % self._cur_frame] = positions self._cur_frame += 1 else: for frame in range(shape[0]): h5f["positions/frame-%d" % self._cur_frame] = positions[frame, :, :] self._cur_frame += 1 del h5f["positions/num_frames"] h5f["positions/num_frames"] = self._cur_frame def _write_pbc_matrix(self): with h5py.File(self._file_path, "a") as h5f: del h5f["pbc_matrix"] h5f["pbc_matrix"] = self._pbc_matrix def _write_topology(self): with h5py.File(self._file_path, "a") as h5f: del h5f["topology"] h5f.create_group("topology") h5f.create_group("topology/particles") # Assign data particle_id = np.empty([self._topology.num_particles], dtype=NUMPY_INT) particle_type = [] particle_name = [] matrix_id = np.empty([self._topology.num_particles], dtype=NUMPY_INT) molecule_id = np.empty([self._topology.num_particles], dtype=NUMPY_INT) molecule_type = [] chain_id = [] mass = np.empty([self._topology.num_particles], dtype=NUMPY_FLOAT) charge = np.empty([self._topology.num_particles], dtype=NUMPY_FLOAT) for index, particle in enumerate(self._topology.particles): particle_id[index] = self._check_none(particle.particle_id, NUMPY_INT) particle_type.append(self._check_none(particle.particle_type, str)) particle_name.append(self._check_none(particle.particle_name, str)) matrix_id[index] = self._check_none(particle.matrix_id, NUMPY_INT) molecule_id[index] = self._check_none(particle.molecule_id, NUMPY_INT) molecule_type.append(self._check_none(particle.molecule_type, str)) chain_id.append(self._check_none(particle.chain_id, str)) mass[index] = self._check_none(particle.mass, NUMPY_FLOAT) charge[index] = self._check_none(particle.charge, NUMPY_FLOAT) h5f["topology/particles/particle_id"] = particle_id h5f["topology/particles/particle_type"] = particle_type h5f["topology/particles/particle_name"] = particle_name h5f["topology/particles/matrix_id"] = matrix_id h5f["topology/particles/molecule_id"] = molecule_id h5f["topology/particles/molecule_type"] = molecule_type h5f["topology/particles/chain_id"] = chain_id h5f["topology/particles/mass"] = mass h5f["topology/particles/charge"] = charge h5f["topology/num_particles"] = NUMPY_INT(self._topology.num_particles) h5f["topology/bonds"] = np.array(self._topology.bonds).astype(NUMPY_INT) h5f["topology/num_bonds"] = NUMPY_INT(self._topology.num_bonds) h5f["topology/angles"] = np.array(self._topology.angles).astype(NUMPY_INT) h5f["topology/num_angles"] = NUMPY_INT(self._topology.num_angles) h5f["topology/dihedrals"] = np.array(self._topology.dihedrals).astype( NUMPY_INT ) h5f["topology/num_dihedrals"] = NUMPY_INT(self._topology.num_dihedrals) h5f["topology/impropers"] = np.array(self._topology.impropers).astype( NUMPY_INT ) h5f["topology/num_impropers"] = NUMPY_INT(self._topology.num_impropers) @staticmethod def _check_none(val, target_type): return ( target_type(val) if not isinstance(val, type(None)) else target_type(NONE_LABEL) ) @property def file_path(self): return self._file_path @property def mode(self): return self._mode @property def topology(self): return self._topology @topology.setter def topology(self, topology: Topology): if not isinstance(topology, Topology): raise TypeError( "The topology attribute should be the instance of mdpy.core.Topology class" ) self._topology = topology self._write_topology() @property def pbc_matrix(self): return self._pbc_matrix @pbc_matrix.setter def pbc_matrix(self, pbc_matrix: np.ndarray): pbc_matrix = check_quantity_value(pbc_matrix, default_length_unit) self._pbc_matrix = check_pbc_matrix(pbc_matrix) self._write_pbc_matrix()