"""Utilities for the numerical simulations (:mod:`fluidsim.util.util`)
======================================================================
"""
import json
import os
import time
import warnings
from copy import deepcopy as _deepcopy
from datetime import timedelta
from functools import partial
from importlib import import_module
from pathlib import Path
from time import perf_counter
from math import radians
import h5netcdf
import h5py
import numpy as np
import fluiddyn as fld
from fluiddyn.io.redirect_stdout import stdout_redirected
from fluiddyn.util import mpi
from fluiddyn.util import get_memory_usage
from fluidsim_core import loader
from fluidsim_core.output.dataframe_from_paths import DataframeMaker
from fluidsim_core.paths import find_path_result_dir
from fluidsim.base.params import (
Parameters,
fix_old_params,
load_info_solver,
load_params_simul,
merge_params,
)
from fluidsim.base.solvers.info_base import create_info_simul
from fluidsim.extend_simul import _extend_simul_class_from_path
from .output import save_file
available_solvers = partial(
loader.available_solvers, entrypoint_grp="fluidsim.solvers"
)
import_module_solver_from_key = partial(
loader.import_module_solver, entrypoint_grp="fluidsim.solvers"
)
import_simul_class_from_key = partial(
loader.import_cls_simul, entrypoint_grp="fluidsim.solvers"
)
# A bad hack to silence a h5netcdf warning
h5netcdf.File = partial(h5netcdf.File, decode_vlen_strings=True)
# FutureWarning: String decoding changed with h5py >= 3.0. See
# https://docs.h5py.org/en/latest/strings.html and
# https://github.com/h5netcdf/h5netcdf/issues/132 for more details. Currently
# backwards compatibility with h5py < 3.0 is kept by decoding vlen strings per
# default. This will change in future versions for consistency with h5py >= 3.0.
# To silence this warning set kwarg ``decode_vlen_strings=False`` which will
# return Python bytes from variables containing vlen strings. Setting
# ``decode_vlen_strings=True`` forces vlen string decoding which returns Python
# strings from variables containing vlen strings.
def print_memory_usage_seq(message, flush=None):
mem = get_memory_usage()
print(message, f"{mem/1024: 7.3f} Go", flush=flush)
def available_solver_keys():
"""List all available solvers.
Returns
-------
list
"""
return sorted(available_solvers())
def get_dim_from_solver_key(key):
"""Try to guess the dimension from the solver key (via the operator name)."""
cls = import_simul_class_from_key(key)
info = cls.InfoSolver(only_root=True)
class_name = info.classes.Operators.class_name
# special cases:
if class_name == "OperatorsPseudoSpectralSW1L":
return "2"
for dim in range(4):
if str(dim) in class_name:
return str(dim)
raise NotImplementedError(
"Cannot deduce dimension of the solver from the name " + class_name
)
pathdir_from_namedir = find_path_result_dir
class ModulesSolvers(dict):
"""Dictionary to gather imported solvers."""
def __init__(self, names_solvers):
for key in names_solvers:
self[key] = import_module_solver_from_key(key)
def name_file_from_time_approx(path_dir, t_approx=None):
"""Return the file name whose time is the closest to the given time.
Parameters
----------
path_dir: Path or str
Path of the directory of the simulation.
t_approx : number or "last" (optional)
Approximate time of the file to be loaded.
.. todo::
Can be elegantly implemented using regex as done in
``fluidsim.base.output.phys_fields.time_from_path``
"""
if not isinstance(path_dir, Path):
path_dir = Path(path_dir)
path_files = sorted(path_dir.glob("state_phys_t*"))
nb_files = len(path_files)
if nb_files == 0 and mpi.rank == 0:
raise ValueError("No state file in the dir\n" + str(path_dir))
if t_approx is None:
# should be the last one but not 100% sure
return path_files[-1].name
name_files = [path.name for path in path_files]
if "state_phys_t=" in name_files[0]:
ind_start_time = len("state_phys_t=")
else:
ind_start_time = len("state_phys_t")
times = np.empty([nb_files])
for ii, name in enumerate(name_files):
tmp = ".".join(name[ind_start_time:].split(".")[:2])
if "_" in tmp:
tmp = tmp[: tmp.index("_")]
times[ii] = float(tmp)
if t_approx == "last":
t_approx = times.max()
i_file = abs(times - t_approx).argmin()
name_file = path_files[i_file].name
return name_file
[docs]
def load_sim_for_plot(
name_dir=None, merge_missing_params=False, hide_stdout=False
):
"""Create a object Simul from a dir result.
Creating simulation objects with this function should be fast because the
state is not initialized with the output file and only a coarse operator is
created.
Parameters
----------
name_dir : str (optional)
Name of the directory of the simulation. If nothing is given, we load the
data in the current directory.
Can be an absolute path, a relative path, or even simply just
the name of the directory under $FLUIDSIM_PATH.
merge_missing_params : bool (optional, default == False)
Can be used to load old simulations carried out with an old fluidsim
version.
hide_stdout : bool (optional, default == False)
If True, without stdout.
"""
path_dir = pathdir_from_namedir(name_dir)
info_solver = load_info_solver(path_dir)
if hasattr(info_solver, "loader"):
name_module, name_func = info_solver.loader.split(".")
mod = import_module(name_module)
loader = getattr(mod, name_func)
return loader(path_dir)
solver = import_module(info_solver.module_name)
params = load_params_simul(path_dir)
if merge_missing_params:
merge_params(params, solver.Simul.create_default_params())
params.path_run = path_dir
params.init_fields.type = "constant"
params.init_fields.modif_after_init = False
params.ONLY_COARSE_OPER = True
params.NEW_DIR_RESULTS = False
params.output.HAS_TO_SAVE = False
params.output.ONLINE_PLOT_OK = False
try:
params.preprocess.enable = False
except AttributeError:
pass
try:
params.oper.type_fft = "default"
params.oper.type_fft2d = "sequential"
except AttributeError:
pass
fix_old_params(params)
try:
path_file = Path(path_dir) / name_file_from_time_approx(path_dir)
except ValueError:
# no state_phys file to extend the simul class. Let's try with info_solver.xml
path_file = Path(path_dir) / "info_solver.xml"
if path_file.exists():
Simul = _extend_simul_class_from_path(solver.Simul, path_file)
else:
Simul = solver.Simul
with stdout_redirected(hide_stdout):
sim = Simul(params)
return sim
def _import_solver_from_path(path_dir):
info_solver = load_info_solver(path_dir)
solver = import_module(info_solver.module_name)
return solver
[docs]
def load_state_phys_file(
name_dir=None,
t_approx="last",
modif_save_params=True,
merge_missing_params=False,
init_with_initialized_state=True,
hide_stdout=False,
):
"""Create a simulation from a file.
For large resolution, creating a simulation object with this function can
be slow because the state is initialized with the output file.
Parameters
----------
name_dir : str (optional)
Name of the directory of the simulation. If nothing is given, we load the
data in the current directory.
Can be an absolute path, a relative path, or even simply just
the name of the directory under $FLUIDSIM_PATH.
t_approx : number (optional)
Approximate time of the file to be loaded.
modif_save_params : bool (optional, default == True)
If True, the parameters of the simulation are modified before loading::
params.output.HAS_TO_SAVE = False
params.output.ONLINE_PLOT_OK = False
merge_missing_params : bool (optional, default == False)
Can be used to load old simulations carried out with an old fluidsim
version.
init_with_initialized_state : bool (optional, default == True)
If True, call sim.output.init_with_initialized_state.
hide_stdout : bool (optional, default == False)
If True, without stdout.
"""
params, Simul = load_for_restart(name_dir, t_approx, merge_missing_params)
if modif_save_params:
params.output.HAS_TO_SAVE = False
params.output.ONLINE_PLOT_OK = False
params.ONLY_COARSE_OPER = False
try:
params.oper.type_fft = "default"
params.oper.type_fft2d = "sequential"
except AttributeError:
pass
with stdout_redirected(hide_stdout):
sim = Simul(params)
if init_with_initialized_state:
sim.output.init_with_initialized_state()
return sim
def _path_file_from_time_approx(thing, t_approx):
if thing is not None and Path(thing).is_file():
path_file = Path(thing)
else:
path_dir = pathdir_from_namedir(thing)
# choose the file with the time closer to t_approx
name_file = name_file_from_time_approx(path_dir, t_approx)
path_file = Path(path_dir) / name_file
return path_file
[docs]
def load_for_restart(name_dir=None, t_approx="last", merge_missing_params=False):
"""Load params and Simul for a restart.
>>> params, Simul = load_for_restart(name_dir)
Parameters
----------
name_dir : str (optional)
Name of the directory of the simulation. If nothing is given, we load the
data in the current directory.
Can be an absolute path, a relative path, or even simply just
the name of the directory under $FLUIDSIM_PATH.
t_approx : number (optional)
Approximate time of the file to be loaded.
merge_missing_params : bool (optional, default == False)
Can be used to load old simulations carried out with an old fluidsim
version.
"""
path_file = _path_file_from_time_approx(name_dir, t_approx)
path_dir = path_file.parent
solver = _import_solver_from_path(path_dir)
Simul = _extend_simul_class_from_path(solver.Simul, path_file)
if merge_missing_params:
# this has to be done by all processes otherwise there is a problem
# with Transonic (see https://foss.heptapod.net/fluiddyn/fluidsim/issues/26)
default_params = Simul.create_default_params()
if mpi.rank > 0:
params = None
else:
with h5py.File(path_file, "r") as file:
params = Parameters(hdf5_object=file["info_simul"]["params"])
if merge_missing_params:
merge_params(params, default_params)
params.path_run = path_dir
params.NEW_DIR_RESULTS = False
params.init_fields.type = "from_file"
params.init_fields.from_file.path = path_file
params.init_fields.modif_after_init = False
try:
params.preprocess.enable = False
except AttributeError:
pass
fix_old_params(params)
if mpi.nb_proc > 1:
params = mpi.comm.bcast(params, root=0)
return params, Simul
def modif_resolution_all_dir(t_approx=None, coef_modif_resol=2, dir_base=None):
"""Save files with a modified resolution."""
raise DeprecationWarning("Sorry, use modif_resolution_from_dir instead")
[docs]
def modif_resolution_from_dir(
name_dir=None, t_approx="last", coef_modif_resol=2, PLOT=True
):
"""Save a file with a modified resolution."""
sim = load_state_phys_file(name_dir, t_approx)
params2 = _deepcopy(sim.params)
params2.output.HAS_TO_SAVE = True
params2.oper.nx = int(sim.params.oper.nx * coef_modif_resol)
params2.oper.ny = int(sim.params.oper.ny * coef_modif_resol)
try:
params2.oper.nz = int(sim.params.oper.nz * coef_modif_resol)
except AttributeError:
dimension = 2
else:
dimension = 3
params2.init_fields.type = "from_simul"
sim2 = sim.__class__(params2)
sim2.init_fields.get_state_from_simul(sim)
oper_new = sim2.params.oper
if dimension == 3:
dir_new_file = f"/State_phys_{oper_new.nx}x{oper_new.ny}x{oper_new.nz}"
else:
dir_new_file = f"/State_phys_{oper_new.nx}x{oper_new.ny}"
sim2.output.path_run = str(sim.output.path_run) + dir_new_file
print("Save file in directory\n" + sim2.output.path_run)
sim2.output.phys_fields.save(particular_attr="modif_resolution")
print("The new file is saved.")
if PLOT:
sim.output.phys_fields.plot(numfig=0)
sim2.output.phys_fields.plot(numfig=1)
fld.show()
class StatePhysLike:
def __init__(self, path_file, oper, oper2):
self.path_file = path_file
self.oper = oper
self.oper2 = oper2
self.info = "state_phys"
self.field = oper.create_arrayX()
self.field_spect = oper.create_arrayK()
print_memory_usage_seq("Memory usage after init fields: ")
self.field2 = oper2.create_arrayX()
print(
"size field2: "
f"{self.field2.nbytes / 1024**3:7.3f} Go"
)
print_memory_usage_seq("Memory usage after init field2: ")
self.field2_spect = oper2.create_arrayK(0)
print(
"size field2_spect: "
f"{self.field2_spect.nbytes / 1024**3:7.3f} Go"
)
print_memory_usage_seq("Memory usage after init field2_spect: ")
if path_file.suffix == ".nc":
self.h5pack = h5netcdf
else:
self.h5pack = h5py
with self.h5pack.File(self.path_file, "r") as h5file:
group_state_phys = h5file["/state_phys"]
self.keys = list(group_state_phys.keys())
self.time = float(group_state_phys.attrs["time"])
self.it = int(group_state_phys.attrs["it"])
self.name_run = h5file.attrs["name_run"]
def get_var(self, key):
print(f'get_var("{key}")')
def start_counter(message):
print(f"- {message + '...':30s}", end="", flush=True)
return perf_counter()
def end_counter(t_start):
print(f"done in {timedelta(seconds=perf_counter() - t_start)}")
t_start = start_counter("reading field from disk")
with self.h5pack.File(self.path_file, "r") as h5file:
group_state_phys = h5file["/state_phys"]
self.field[:] = group_state_phys[key][...]
end_counter(t_start)
t_start = start_counter("forward fft smaller field")
self.oper.fft_as_arg(self.field, self.field_spect)
end_counter(t_start)
dimension = len(self.field_spect.shape)
if dimension not in [2, 3]:
raise NotImplementedError
t_start = start_counter("filling field2_fft")
self.oper2.fill_field_fft(self.field_spect, self.field2_spect, self.oper)
end_counter(t_start)
t_start = start_counter("backward fft field2")
self.oper2.ifft_as_arg(self.field2_spect, self.field2)
end_counter(t_start)
return self.field2
[docs]
def modif_resolution_from_dir_memory_efficient(
name_dir=None, t_approx="last", coef_modif_resol=2
):
"""Save a file with a modified resolution.
Faster and more memory efficient than ``modif_resolution_from_dir`` (but
not plot).
"""
t_start = perf_counter()
if mpi.nb_proc > 1:
raise NotImplementedError
path_file = _path_file_from_time_approx(name_dir, t_approx)
print(f"Changing resolution of the state contained in\n{path_file}")
path_dir = path_file.parent
solver = _import_solver_from_path(path_dir)
Simul = _extend_simul_class_from_path(solver.Simul, path_file)
try:
info_solver = Simul.info_solver
except AttributeError:
info_solver = Simul.InfoSolver()
with h5py.File(path_file, "r") as h5file:
params = Parameters(hdf5_object=h5file["/info_simul/params"])
try:
params.oper.type_fft = "default"
params.oper.type_fft2d = "sequential"
except AttributeError:
pass
params2 = _deepcopy(params)
params2.output.HAS_TO_SAVE = True
nx2 = params2.oper.nx = int(params.oper.nx * coef_modif_resol)
ny2 = params2.oper.ny = int(params.oper.ny * coef_modif_resol)
try:
nz2 = params2.oper.nz = int(params.oper.nz * coef_modif_resol)
except AttributeError:
dimension = 2
shape = (params.oper.ny, params.oper.nx)
shape2 = (ny2, nx2)
else:
dimension = 3
shape = (params.oper.nz, params.oper.ny, params.oper.nx)
shape2 = (nz2, ny2, nx2)
from .mini_oper_modif_resol import MiniOperModifResol
oper = MiniOperModifResol(shape)
print_memory_usage_seq(
'Memory usage after init operator "input": ', flush=True
)
oper2 = MiniOperModifResol(shape2)
print_memory_usage_seq('Memory usage after init operator "output":')
info2 = create_info_simul(info_solver, params2)
state_phys = StatePhysLike(path_file, oper, oper2)
if dimension == 3:
dir_new_new = f"State_phys_{nx2}x{ny2}x{nz2}"
else:
dir_new_new = f"State_phys_{nx2}x{ny2}"
path_file_out = path_file.parent / dir_new_new / path_file.name
path_file_out.parent.mkdir(exist_ok=True)
print(f"Saving file {path_file_out.name}...", flush=True)
save_file(
path_file_out,
state_phys,
info2,
state_phys.name_run,
oper2,
state_phys.time,
state_phys.it,
particular_attr="modif_resolution",
)
print(
f"File {path_file_out.name} saved in:\n{path_file_out.parent}\n"
f"total duration: {timedelta(seconds=perf_counter() - t_start)}"
)
[docs]
def times_start_last_from_path(path):
"""Return the start and last times from a result directory path."""
path_file = Path(path) / "stdout.txt"
if not path_file.exists():
print(f"Given path does not exist:\n{path}")
return 666, 666
with open(path_file, "r") as file_stdout:
line = ""
while not line.startswith("it ="):
line = file_stdout.readline()
words = line.split()
t_s = float(words[6])
# in order to get the information at the end of the file,
# we do not want to read the full file...
file_stdout.seek(0, os.SEEK_END) # go to the end
nb_caract = file_stdout.tell()
nb_caract_to_read = min(nb_caract, 1000)
file_stdout.seek(file_stdout.tell() - nb_caract_to_read, os.SEEK_SET)
while line != "":
if line.startswith("it ="):
line_it = line
last_line = line
line = file_stdout.readline()
if last_line.startswith("save state_phys"):
name_file = last_line.split()[-1]
name_file, ext = os.path.splitext(name_file)
word = name_file.split("_it=")[0].split("state_phys_t")[-1]
t_last = float(word.replace(ext, ""))
else:
words = line_it.split()
t_last = float(words[6])
return t_s, t_last
def times_start_end_from_path(path):
"""deprecated, use times_start_last_from_path instead"""
warnings.warn(
"times_start_end_from_path is deprecated please use times_start_last_from_path",
category=DeprecationWarning,
)
return times_start_last_from_path(path)
[docs]
def get_last_estimated_remaining_duration(path):
"""Get last estimated remaining duration written in stdout.txt"""
path_file = Path(path) / "stdout.txt"
if not path_file.exists():
raise ValueError(f"No file stdout.txt in {path}")
with open(path_file, "r") as file_stdout:
# in order to get the information at the end of the file,
# we do not want to read the full file...
file_stdout.seek(0, os.SEEK_END) # go to the end
nb_caract = file_stdout.tell()
nb_caract_to_read = min(nb_caract, 3000)
file_stdout.seek(file_stdout.tell() - nb_caract_to_read, os.SEEK_SET)
txt = file_stdout.read()
for line in txt.split("\n")[::-1]:
line = line.strip()
if line.startswith("estimated remaining duration"):
return line.split(" = ")[1].strip()
raise RuntimeError(f"No estimated remaining duration in file {path_file}")
[docs]
def open_patient(
path,
*args,
time_wait_total=200,
time_wait_once=2,
class_file=h5py.File,
**kwargs,
):
"""Open a hdf5 type file in a "patient" way.
If the file is already opened by another process (``errno==11`` for hdf5),
the error is caught and we retry later.
Parameters
----------
time_wait_total : number (optional)
Time to wait before raising the error.
time_wait_once : number (optional)
Time between attempts.
class_file : type (optional)
Class of the file (default ``h5py.File``, but could be ``h5netcdf.File``).
"""
time_first_try = time.time()
while True:
try:
file = class_file(path, *args, **kwargs)
break
except OSError as error:
errno = int(repr(error).split("errno = ")[1].split(",")[0])
if errno != 11 or time.time() - time_first_try > time_wait_total:
raise
time.sleep(time_wait_once)
return file
[docs]
def ensure_radians(angle):
"""Convert strings like "45°" to radians (as float)"""
if isinstance(angle, str):
if angle.endswith("°"):
angle = radians(float(angle[:-1]))
else:
raise ValueError(
"Angle should be a string with \n"
+ "the degree symbol or a float in radians"
)
return angle
def get_last_time_spatial_means_from_path(path):
path_file = Path(path) / "spatial_means.txt"
if path_file.exists():
with open(path_file, "rb") as file_means:
nb_char = file_means.seek(0, os.SEEK_END) # go to the end
nb_char_to_read = min(nb_char, 1000)
file_means.seek(-nb_char_to_read, 2)
line = file_means.readline()
while line != b"":
if line.startswith(b"time ="):
line_time = line
line = file_means.readline()
words = line_time.split()
return float(words[2])
path_file = path_file.with_suffix(".json")
if path_file.exists():
with open(path_file, "rb") as file_means:
nb_char = file_means.seek(0, os.SEEK_END) # go to the end
nb_char_to_read = min(nb_char, 1000)
file_means.seek(-nb_char_to_read, 2)
line = file_means.readline()
while line != b"":
line_prev = line
line = file_means.readline()
return json.loads(line_prev)["t"]
class _DataframeMakerFluidsim(DataframeMaker):
def get_time_start_from_path(self, path):
""" "Get first time"""
t_start, _ = times_start_last_from_path(path)
return t_start
def get_time_last_from_path(self, path):
""" "Get last saved time"""
return get_last_time_spatial_means_from_path(path)
def load_sim(self, path):
"""Load a simulation object"""
return load_sim_for_plot(path, hide_stdout=True)
def get_dataframe_from_paths(
self, paths, tmin=None, tmax=None, use_cache=True, customize=None
):
df = super().get_dataframe_from_paths(
paths, tmin, tmax, use_cache, customize
)
if "R2" in df.columns and "R4" in df.columns:
df["min_R"] = np.array([df.R2, df.R4]).min(axis=0)
return df
_dataframe_maker = _DataframeMakerFluidsim()
get_mean_values_from_path = _dataframe_maker.get_mean_values_from_path
get_dataframe_from_paths = _dataframe_maker.get_dataframe_from_paths