Source code for fluidsim.base.output.base

"""Base module for the output
=============================

Provides:

.. autoclass:: OutputBase
   :members:
   :private-members:
   :noindex:

.. autoclass:: OutputBasePseudoSpectral
   :members:
   :private-members:

.. autoclass:: SpecificOutput
   :members:
   :private-members:

.. autoclass:: SimReprMaker
   :members:
   :private-members:
   :noindex:
"""

import datetime
import os
import shutil
import numbers
from copy import copy
from warnings import warn

import numpy as np
import h5py
import matplotlib.pyplot as plt

import fluiddyn
from fluiddyn.util import mpi
from fluiddyn.util import is_run_from_ipython, print_memory_usage
from fluiddyn.io import FLUIDSIM_PATH, Path
from fluidsim_core.output import OutputCore, SimReprMakerCore
from fluidsim_core.params import iter_complete_params

import fluidsim
from fluidsim.util import open_patient, get_mean_values_from_path


[docs] class SimReprMaker(SimReprMakerCore): """Produce a string representing the simulation""" def get_time_as_str(self): params = self.sim.params if not params.NEW_DIR_RESULTS and ( params.ONLY_COARSE_OPER or params.init_fields.type == "from_file" ): return self.time_from_path_run(params.path_run) else: return super().get_time_as_str()
[docs] class OutputBase(OutputCore): """Handle the output.""" SimReprMaker = SimReprMaker
[docs] @staticmethod def _complete_info_solver(info_solver): """Complete the ParamContainer info_solver.""" classes = info_solver.classes.Output._set_child("classes") classes._set_child( "PrintStdOut", attribs={ "module_name": "fluidsim.base.output.print_stdout", "class_name": "PrintStdOutBase", }, ) classes._set_child( "PhysFields", attribs={ "module_name": "fluidsim.base.output.phys_fields2d", "class_name": "PhysFieldsBase2D", }, )
[docs] @staticmethod def _complete_params_with_default(params, info_solver): """This static method is used to complete the *params* container.""" attribs = { "ONLINE_PLOT_OK": True, "period_refresh_plots": 1, "HAS_TO_SAVE": True, "sub_directory": "", } p_output = params._set_child("output", attribs=attribs) p_output._set_doc( """ See :mod:`fluidsim.output.base` ONLINE_PLOT_OK: bool (default: True) If True, the online plots are enabled. period_refresh_plots: float (default: 1) Period of refreshment of the online plots. HAS_TO_SAVE: bool (default: True) If False, nothing new is saved in the directory of the simulation. sub_directory: str (default: "") A name of a subdirectory where the directory of the simulation is saved. """ ) p_output._set_child("periods_save") p_output.periods_save._set_doc( """ Periods (float, in equation time) to set when the specific outputs are saved. """ ) p_output._set_child("periods_print") p_output.periods_print._set_doc( """ Periods (float, in equation time) to set when the printing specific outputs are called. """ ) p_output._set_child("periods_plot") p_output.periods_plot._set_doc( """ Periods (float, in equation time) to set when the plots of the specific outputs are called. """ ) dict_classes = info_solver.classes.Output.import_classes() iter_complete_params(params, info_solver, dict_classes.values())
def __init__(self, sim): super().__init__(sim) dict_classes = sim.info.solver.classes.Output.import_classes() PrintStdOut = dict_classes["PrintStdOut"] self.print_stdout = PrintStdOut(self) if not self.params.ONLINE_PLOT_OK: for k in self.params.periods_plot._get_key_attribs(): self.params.periods_plot[k] = 0.0 if not self._has_to_save: for k in self.params.periods_save._get_key_attribs(): self.params.periods_save[k] = 0.0
[docs] def _init_sim_repr_maker(self): sim_repr_maker = super()._init_sim_repr_maker() sim = self.sim # oper should already be initialized at this point if hasattr(self, "oper") and hasattr(self.oper, "_modify_sim_repr_maker"): self.oper._modify_sim_repr_maker(sim_repr_maker) if hasattr(sim, "_modify_sim_repr_maker"): sim._modify_sim_repr_maker(sim_repr_maker) # other classes are not initialized at this point dict_classes = sim.info_solver.import_classes() try: cls = dict_classes["Forcing"] except KeyError: pass else: if hasattr(cls, "_modify_sim_repr_maker"): cls._modify_sim_repr_maker(sim_repr_maker) return sim_repr_maker
[docs] def init_with_oper_and_state(self): warn( "This method has been replaced with a more generic name `post_init`.", DeprecationWarning, ) self.post_init()
[docs] def post_init(self): sim = self.sim super().post_init() if mpi.rank == 0: objects_to_print = { "sim.oper": sim.oper, "sim.state": sim.state, "sim.time_stepping": sim.time_stepping, "sim.init_fields": sim.init_fields, } if hasattr(sim, "forcing"): objects_to_print["sim.forcing"] = sim.forcing if hasattr(sim, "preprocess"): objects_to_print["sim.preprocess"] = sim.preprocess for key, obj in objects_to_print.items(): self.print_stdout(f"{key + ': ':20s}" + str(obj.__class__)) # print info on the run if hasattr(sim.params.time_stepping, "type_time_scheme"): specifications = ( sim.params.time_stepping.type_time_scheme + " and " ) else: specifications = "" if mpi.nb_proc == 1: specifications += "sequential,\n" else: specifications += f"parallel ({mpi.nb_proc} proc.)\n" self.print_stdout( "\nsolver " + self.name_solver + ", " + specifications + self.oper.produce_long_str_describing_oper() + "path_run =\n" + self.path_run + "\n" + "init_fields.type: " + sim.params.init_fields.type + "\n" ) if hasattr(self.sim, "produce_str_describing_params"): self.print_stdout( "Important parameters: \n" + self.sim.produce_str_describing_params() ) if mpi.rank == 0 and is_run_from_ipython(): plt.ion() if sim.state.is_initialized: if hasattr(sim, "forcing") and not sim.forcing.is_initialized(): return self.init_with_initialized_state()
[docs] def _save_info_solver_params_xml(self, replace=False): """Save files with information on the solver and on the run.""" if ( mpi.rank == 0 and self._has_to_save and self.sim.params.NEW_DIR_RESULTS ): super()._save_info_solver_params_xml( replace=replace, comment=f"FluidSim {fluidsim.get_local_version()}", )
[docs] def init_with_initialized_state(self): if ( hasattr(self, "_has_been_initialized_with_state") and self._has_been_initialized_with_state ): return else: self._has_been_initialized_with_state = True params = self.sim.params # just for the first output if ( hasattr(params.time_stepping, "USE_CFL") and params.time_stepping.USE_CFL ): self.sim.time_stepping.compute_time_increment_CLF() if hasattr(self.sim, "forcing") and params.output.HAS_TO_SAVE: self.sim.forcing.compute() self.print_stdout("Initialization outputs:") self.print_stdout.complete_init_with_state() dict_classes = copy(self.sim.info.solver.classes.Output.import_classes()) # The class PrintStdOut has already been instantiated. dict_classes.pop("PrintStdOut") # to get always the same order (important with mpi) keys = sorted(dict_classes.keys()) classes = [dict_classes[key] for key in keys] for Class in classes: if mpi.rank == 0: self.print_stdout( f"{'sim.output.' + Class._tag + ':':30s}" + str(Class) ) self.__dict__[Class._tag] = Class(self) print_memory_usage("\nMemory usage at the end of init. (equiv. seq.)") try: self.print_size_in_Mo(self.sim.state.state_spect, "state_spect") except AttributeError: self.print_size_in_Mo(self.sim.state.state_phys, "state_phys")
[docs] def one_time_step(self): for k in self.params.periods_print._get_key_attribs(): period = self.params.periods_print.__dict__[k] if period != 0: self.__dict__[k]._online_print() if self.params.ONLINE_PLOT_OK: for k in self.params.periods_plot._get_key_attribs(): period = self.params.periods_plot.__dict__[k] if period != 0: self.__dict__[k]._online_plot() if self._has_to_save: for k in self.params.periods_save._get_key_attribs(): period = self.params.periods_save.__dict__[k] if period != 0: self.__dict__[k]._online_save()
[docs] def figure_axe(self, numfig=None, size_axe=None): if mpi.rank == 0: if size_axe is None and numfig is None: return plt.subplots() if numfig is None: fig = plt.figure() else: fig = plt.figure(numfig) fig.clf() if size_axe is not None: ax = fig.add_axes(size_axe) else: ax = fig.subplots() return fig, ax
[docs] def close_files(self): if mpi.rank == 0 and self._has_to_save: self.print_stdout.close() for k in self.params.periods_save._get_key_attribs(): period = self.params.periods_save.__dict__[k] if period != 0: if hasattr(self.__dict__[k], "_close_file"): self.__dict__[k]._close_file()
[docs] def end_of_simul(self, total_time): # self.path_run: str if self._has_to_save: if hasattr(self.sim, "forcing"): self.sim.forcing.compute() self.one_time_step() if self.sim.output.phys_fields.t_last_save < self.sim.time_stepping.t: self.phys_fields.save() path_run = Path(self.path_run) self.print_stdout( f"Computation completed in {total_time:8.6g} s\n" f"path_run =\n{path_run}" ) self.close_files() if not self.path_run.startswith(FLUIDSIM_PATH): path_base = Path(FLUIDSIM_PATH) if len(self.params.sub_directory) > 0: path_base = path_base / self.params.sub_directory new_path_run = path_base / self.sim.name_run try: if new_path_run.parent.samefile(path_run.parent): return except OSError: pass i = 1 while new_path_run.exists(): new_path_run = new_path_run.with_name(self.sim.name_run + f"_{i}") i += 1 if mpi.rank == 0 and path_run.exists(): if not path_base.exists(): os.makedirs(path_base) shutil.move(self.path_run, new_path_run) print(f"move result directory in directory:\n{new_path_run}") self.path_run = str(new_path_run) for spec_output in list(self.__dict__.values()): if isinstance(spec_output, SpecificOutput): try: spec_output._init_path_files() except AttributeError: pass if mpi.nb_proc > 1: mpi.comm.barrier()
[docs] def compute_energy(self): return 0.0
[docs] def print_size_in_Mo(self, arr, string=None): if string is None: string = "Size of ndarray (equiv. seq.)" else: string = "Size of " + string + " (equiv. seq.)" mem = arr.nbytes * 1.0e-6 if mpi.nb_proc > 1: mem = mpi.comm.allreduce(mem, op=mpi.MPI.SUM) self.print_stdout(string.ljust(30) + f": {mem} Mo")
[docs] def get_mean_values( self, tmin=None, tmax=None, use_cache=True, customize=None ): """Get a dict of scalar values characterizing the simulation Parameters ---------- tmin: float Minimum time tmax: float Maximum time use_cache: bool If True, return the cached result customize: callable If not None, called as ``customize(result, self.sim)`` to modify the returned dict. Examples -------- .. code-block:: python def customize(result, sim): result["Rb"] = float(sim.params.short_name_type_run.split("_Rb")[-1]) sim.output.get_mean_values(customize=customize) """ return get_mean_values_from_path( self.path_run, tmin, tmax, use_cache, customize )
[docs] def _compute_mean_values(self, tmin, tmax): result = {} try: result["N"] = self.sim.params.N except AttributeError: pass averages = self.spatial_means.get_dimless_numbers_averaged( tmin=tmin, tmax=tmax ) for key in ["Uh2", "epsK", "EKh", "EKz", "k_max"]: result[key] = averages["dimensional"][key] for key in ["epsA", "EA"]: try: result[key] = averages["dimensional"][key] except KeyError: pass for key in ["Gamma", "Fh", "R2", "k_max*eta", "epsK2/epsK"]: try: result[key] = averages[key] except KeyError: pass try: result["R4"] = averages["R4"] except KeyError: result["R4"] = np.inf data_spectra = self.spectra.load1d_mean(tmin, tmax, verbose=False) result["I_velocity"] = self.spectra.compute_isotropy_velocities( data=data_spectra ) lengths = self.spectra.compute_length_scales(data=data_spectra) result.update(lengths) result["I_dissipation"] = ( self.spect_energy_budg.compute_isotropy_dissipation(tmin) ) return result
[docs] class OutputBasePseudoSpectral(OutputBase):
[docs] def post_init(self): oper = self.oper self.sum_wavenumbers = oper.sum_wavenumbers super().post_init()
[docs] def compute_energy_fft(self): """Compute energy(k)""" energy_fft = 0.0 for k in self.sim.state.keys_state_spect: energy_fft += ( np.abs(self.sim.state.state_spect.get_var(k)) ** 2 ) / 2.0 return energy_fft
[docs] def compute_energy(self): """Compute the spatially averaged energy.""" energy_fft = self.compute_energy_fft() return self.sum_wavenumbers(energy_fft)
[docs] class SpecificOutput: """Small class for features useful for specific outputs""" def __init__( self, output, period_save=0, period_plot=0, has_to_plot_saved=False, arrays_1st_time=None, ): sim = output.sim params = sim.params self.output = output self.sim = sim self.oper = sim.oper self.params = params self.period_save = period_save self.period_plot = period_plot self.has_to_plot = has_to_plot_saved if not params.output.ONLINE_PLOT_OK: self.period_plot = 0 self.has_to_plot = False if not has_to_plot_saved: if self.period_plot > 0: self.has_to_plot = True else: self.has_to_plot = False self.period_show = params.output.period_refresh_plots self.t_last_show = 0.0 self._init_path_files() if hasattr(self, "_cls_movies"): self.movies = self._cls_movies(output, self) self.animate = self.movies.animate self.interact = self.movies.interact if self.has_to_plot: self._init_online_plot() if not output._has_to_save: self.period_save = 0.0 if self.period_save != 0.0: self._init_files(arrays_1st_time) def _init_path_files(self): if hasattr(self, "_name_file"): self.path_file = os.path.join(self.output.path_run, self._name_file) def _init_files(self, arrays_1st_time=None): if arrays_1st_time is None: arrays_1st_time = {} dict_results = self.compute() if mpi.rank == 0: if not os.path.exists(self.path_file): self._create_file_from_dict_arrays( self.path_file, dict_results, arrays_1st_time ) self.nb_saved_times = 1 else: with h5py.File(self.path_file, "r") as file: dset_times = file["times"] self.nb_saved_times = dset_times.shape[0] + 1 self._add_dict_arrays_to_file(self.path_file, dict_results) self.t_last_save = self.sim.time_stepping.t
[docs] def _online_save(self): """Save the values at one time.""" tsim = self.sim.time_stepping.t if tsim - self.t_last_save >= self.period_save: self.t_last_save = tsim dict_results = self.compute() if mpi.rank == 0: self._add_dict_arrays_to_file(self.path_file, dict_results) self.nb_saved_times += 1 if self.has_to_plot: self._online_plot_saving(dict_results) if tsim - self.t_last_show >= self.period_show: self.t_last_show = tsim self.fig.canvas.draw() # needed to really show the figures plt.pause(1e-3)
def _create_file_from_dict_arrays( self, path_file, dict_matrix, arrays_1st_time ): if os.path.exists(path_file): print("file NOT created since it already exists!") elif mpi.rank == 0: with h5py.File(path_file, "w") as file: file.attrs["date saving"] = str(datetime.datetime.now()).encode() file.attrs["name_solver"] = self.output.name_solver file.attrs["name_run"] = self.output.name_run self.sim.info._save_as_hdf5(hdf5_parent=file) times = np.array([self.sim.time_stepping.t], dtype=np.float64) file.create_dataset("times", data=times, maxshape=(None,)) for k, v in list(arrays_1st_time.items()): file.create_dataset(k, data=v) for k, v in list(dict_matrix.items()): if isinstance(v, numbers.Number): arr = np.array([v], dtype=v.__class__) arr.resize((1,)) file.create_dataset(k, data=arr, maxshape=(None,)) else: arr = np.array(v) arr.resize((1,) + v.shape) file.create_dataset( k, data=arr, maxshape=((None,) + v.shape) ) def _add_dict_arrays_to_file(self, path_file, dict_matrix): if not os.path.exists(path_file): raise ValueError("can not add dict arrays in nonexisting file!") elif mpi.rank == 0: with open_patient(path_file, "r+") as file: dset_times = file["times"] nb_saved_times = dset_times.shape[0] dset_times.resize((nb_saved_times + 1,)) dset_times[nb_saved_times] = self.sim.time_stepping.t for k, v in list(dict_matrix.items()): if isinstance(v, numbers.Number): dset_k = file[k] dset_k.resize((nb_saved_times + 1,)) dset_k[nb_saved_times] = v else: dset_k = file[k] dset_k.resize((nb_saved_times + 1,) + v.shape) dset_k[nb_saved_times] = v def _add_dict_arrays_to_open_file(self, file, dict_arrays, nb_saved_times): if mpi.rank == 0: dset_times = file["times"] dset_times.resize((nb_saved_times + 1,)) dset_times[nb_saved_times] = self.sim.time_stepping.t for k, v in list(dict_arrays.items()): dset_k = file[k] dset_k.resize((nb_saved_times + 1, v.size)) dset_k[nb_saved_times] = v def _has_to_online_save(self): return ( self.sim.time_stepping.t + 1e-15 ) // self.period_save > self.t_last_save // self.period_save def _init_online_plot(self): pass def _online_plot_saving(self, dict_results): pass def compute(self): raise NotImplementedError