Source code for mdpy.io.log_writer

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
file : log_writer.py
created time : 2022/04/06
author : Zhenyu Wei
copyright : (C)Copyright 2021-present, mdpy organization
"""

import numpy as np
from datetime import datetime, timedelta
from mdpy.core import Ensemble
from mdpy.unit import *
from mdpy.utils import *
from mdpy.error import *


[docs]class LogWriter:
[docs] def __init__( self, file_path: str, time_step, step: bool = False, sim_time: bool = False, sim_speed: bool = False, potential_energy: bool = False, kinetic_energy: bool = False, total_energy: bool = False, temperature: bool = False, pressure: bool = False, volume: bool = False, density: bool = False, cpu_time: bool = False, rest_time: bool = False, total_step: int = 0, seperator: str = "\t", ) -> None: # Input if not file_path.endswith(".log"): raise FileFormatError("The file should end with .log suffix") self._file_path = file_path self._time_step = check_quantity_value(time_step, default_time_unit) self._step = step self._sim_time = sim_time self._potential_energy = potential_energy self._kinetic_energy = kinetic_energy self._total_energy = total_energy self._temperature = temperature self._pressure = pressure self._volume = volume self._density = density self._cpu_time = cpu_time self._rest_time = rest_time self._sim_speed = sim_speed self._total_step = total_step self._seperator = seperator if self._total_step == 0 and self._rest_time == True: raise IOPoorDefinedError( "mdpy.core.LogDumper cannot provide rest_time without specifying total_step" ) # Refresh file open(file_path, "w").close() # Variables self._initial_time = self._now() self._cur_write_time = self._now() self._pre_write_time = self._now() # Dump head self._write_header()
@staticmethod def _now(): return datetime.now().replace(microsecond=0) def _write_info(self, info): with open(self._file_path, "a") as f: print(info, file=f, end="") def write(self, ensemble: Ensemble, cur_step: int): self._cur_write_time = self._now() self._cpu_time = self._cur_write_time - self._initial_time write_info = "" if self._step: write_info += self._get_step(cur_step) + self._seperator if self._sim_time: write_info += self._get_sim_time(cur_step) + self._seperator if self._potential_energy: write_info += self._get_potential_energy(ensemble) + self._seperator if self._kinetic_energy: write_info += self._get_kinetic_energy(ensemble) + self._seperator if self._total_energy: write_info += self._get_total_energy(ensemble) + self._seperator if self._temperature: write_info += self._get_temperature(ensemble) + self._seperator if self._volume: write_info += self._get_volume(ensemble) + self._seperator if self._pressure: pass if self._density: write_info += self._get_density(ensemble) + self._seperator if self._cpu_time: write_info += self._get_cpu_time() + self._seperator if self._sim_speed: write_info += self._get_sim_speed(cur_step) + self._seperator if self._rest_time: write_info += self._get_rest_time(cur_step) + self._seperator if write_info != "": write_info += "\n" self._write_info(write_info) def _write_header(self): header = "" if self._step: header += "Step" + self._seperator if self._sim_time: header += "Time (ns)" + self._seperator if self._potential_energy: header += "E_p (kj/mol)" + self._seperator if self._kinetic_energy: header += "E_k (kj/mol)" + self._seperator if self._total_energy: header += "E_t (kj/mol)" + self._seperator if self._temperature: header += "Temperature (K)" + self._seperator if self._volume: header += "Volume (nm^3)" + self._seperator if self._pressure: header += "Pressure (atm)" + self._seperator if self._density: header += "Density (1e3*kg/m^3)" + self._seperator if self._cpu_time: header += "CPU Time" + self._seperator if self._sim_speed: header += "Speed (ns/day)" + self._seperator if self._rest_time: header += "Rest Time" + self._seperator if header != "": header += "\n" self._write_info(header) def _get_step(self, cur_step: int): return "%d" % (cur_step) def _get_sim_time(self, cur_step: int): sim_time = cur_step * self._time_step sim_time = Quantity(sim_time, default_time_unit).convert_to(nanosecond).value return "%.2e" % sim_time def _get_potential_energy(self, ensemble: Ensemble): return ( "%.3f" % Quantity(ensemble.potential_energy, default_energy_unit) .convert_to(kilojoule_permol) .value ) def _get_kinetic_energy(self, ensemble: Ensemble): return ( "%.3f" % Quantity(ensemble.kinetic_energy, default_energy_unit) .convert_to(kilojoule_permol) .value ) def _get_total_energy(self, ensemble: Ensemble): return ( "%.3f" % Quantity(ensemble.total_energy, default_energy_unit) .convert_to(kilojoule_permol) .value ) def _get_temperature(self, ensemble: Ensemble): kinetic_energy = Quantity(ensemble.kinetic_energy, default_energy_unit) num_particles = ensemble.topology.num_particles temperature = kinetic_energy * Quantity(2 / 3 / num_particles) / KB return "%.2f" % temperature.convert_to(default_temperature_unit).value def _get_volume(self, ensemble: Ensemble): pbc_matrix = ensemble.state.pbc_matrix volume = np.cross(pbc_matrix[0, :], pbc_matrix[1, :]) volume = np.dot(volume, pbc_matrix[2, :]) volume = ( Quantity(np.abs(volume), default_length_unit**3) .convert_to(nanometer**3) .value ) return "%.3f" % volume def _get_density(self, ensemble: Ensemble): pbc_matrix = ensemble.state.pbc_matrix volume = np.cross(pbc_matrix[0, :], pbc_matrix[1, :]) volume = np.abs(np.dot(volume, pbc_matrix[2, :])) mass = ensemble.topology.masses.sum() density = ( Quantity(mass / volume, default_mass_unit / default_length_unit**3) .convert_to(kilogram / meter**3) .value ) return "%.3f" % (density / 1000) def _get_cpu_time(self): return "%s" % (self._cpu_time) def _get_rest_time(self, cur_step: int): if cur_step == 0: return "--:--:--" else: unfinished_ratio = 1 - (cur_step / self._total_step) cpu_seconds = self._cpu_time.total_seconds() rest_seconds = cpu_seconds / (1 - unfinished_ratio) * unfinished_ratio return "%s" % timedelta(seconds=int(rest_seconds)) def _get_sim_speed(self, cur_step: int): self._cur_write_time = self._now() cpu_time = self._cpu_time.total_seconds() if cpu_time == 0: return "--:--:--" else: cpu_time = Quantity(cpu_time, second).convert_to(day).value sim_time = cur_step * self._time_step sim_time = ( Quantity(sim_time, default_time_unit).convert_to(nanosecond).value ) return "%.4f" % (sim_time / cpu_time)