Source code for fluidsim.base.output.spatial_means

"""Spatial means


.. autoclass:: SpatialMeansBase

.. autoclass:: SpatialMeansJSON


import os
import numpy as np
import json
from typing import Dict

import pandas as pd
import xarray as xr
from fluiddyn.util import mpi

from .base import SpecificOutput

def inner_prod(a_fft, b_fft):
    return np.real(a_fft.conj() * b_fft)

[docs] class SpatialMeansBase(SpecificOutput): """A :class:`SpatialMean` object handles the saving of . This class uses the particular functions defined by some solvers :func:`` and :func``. If the solver doesn't has these functions, this class does nothing. """ _tag = "spatial_means" _name_file = _tag + ".txt"
[docs] @staticmethod def _complete_params_with_default(params): tag = "spatial_means" params.output.periods_save._set_attrib(tag, 0) params.output._set_child(tag, attribs={"HAS_TO_PLOT_SAVED": False})
def __init__(self, output): params = output.sim.params self.sum_wavenumbers = output.sum_wavenumbers try: self.vecfft_from_rotfft = output.oper.vecfft_from_rotfft except AttributeError: pass super().__init__( output, period_save=params.output.periods_save.spatial_means, has_to_plot_saved=params.output.spatial_means.HAS_TO_PLOT_SAVED, ) if self.period_save != 0: # saved for each initialization to help detecting bugs self._save_one_time() self.t_last_save = self.sim.time_stepping.t
[docs] def _init_files(self, arrays_1st_time=None): if mpi.rank == 0: if not os.path.exists(self.path_file): self.file = open(self.path_file, "w") else: self.file = open(self.path_file, "r+") # to go to the end of the file, os.SEEK_END)
[docs] def _online_save(self): self()
def __call__(self): """Save the values at one time.""" tsim = self.sim.time_stepping.t if ( tsim + 1e-15 ) // self.period_save > self.t_last_save // self.period_save: self.t_last_save = self.sim.time_stepping.t self._save_one_time()
[docs] def _save_one_time(self, *args): self.t_last_save = self.sim.time_stepping.t
[docs] def _init_online_plot(self): if mpi.rank == 0: width_axe = 0.85 height_axe = 0.4 x_left_axe = 0.12 z_bottom_ax = 0.55 size_ax = [x_left_axe, z_bottom_ax, width_axe, height_axe] fig, ax = self.output.figure_axe(size_axe=size_ax, numfig=3_000_000) self.ax_a = ax ax.set_xlabel("$t$") ax.set_ylabel("$E(t)$") ax.set_title("mean quantities\n" + self.output.summary_simul) z_bottom_ax = 0.08 size_ax[1] = z_bottom_ax ax = fig.add_axes(size_ax) self.axe_b = ax ax.set_xlabel("$t$") ax.set_ylabel(r"$\epsilon(t)$")
[docs] def _get_nb_points_from_lines(self, lines_t, *liness): nt = len(lines_t) liness = [lines for lines in liness if lines] if all(len(lines) == nt for lines in liness): return nt else: # the last line for a quantity may not have been saved yet return nt - 1
[docs] def load(self): dict_results = {} return dict_results
[docs] def load_dataset(self, dims=("t",)): """Loads results as a xarray dataset.""" dict_results = self.load() # NOTE: format specified in # dset = {"coords": {}, "attrs": {}, "dims": dims, "data_vars": {}} for key, value in dict_results.items(): if isinstance(value, np.ndarray): target = "coords" if key in dims else "data_vars" dset[target].update({key: {"dims": dims, "data": value}}) else: dset["attrs"].update({key: value}) return xr.Dataset.from_dict(dset)
[docs] def plot(self): pass
[docs] def compute_time_means(self, tstatio=0.0, tmax=None): """compute the temporal means.""" dict_results = self.load() times = dict_results["t"] imin_mean = np.argmin(abs(times - tstatio)) imax_mean = None if tmax is None else np.argmin(abs(times - tmax)) + 1 dict_time_means = {} for key, value in dict_results.items(): if isinstance(value, np.ndarray): dict_time_means[key] = np.mean(value[imin_mean:imax_mean]) return dict_time_means, dict_results
[docs] def _close_file(self): try: self.file.close() except AttributeError: pass
[docs] def time_first_saved(self) -> float: with open(self.path_file) as file_means: line = "" while not line.startswith("time ="): line = file_means.readline() words = line.split() return float(words[2])
[docs] def time_last_saved(self) -> float: with open(self.path_file, "rb") as file_means: nb_char =, os.SEEK_END) # go to the end nb_char_to_read = min(nb_char, 1000), 2) line = file_means.readline() while line != b"": if line.startswith(b"time ="): line_time = line line = file_means.readline() words = line_time.split() return float(words[2])
[docs] class SpatialMeansJSON(SpatialMeansBase): """Save and load as line-delimited JSON.""" _tag = "spatial_means" _name_file = _tag + ".json"
[docs] def _save_one_time(self, result: Dict[str, float], delimiter: str = "\n"): if mpi.rank == 0: json.dump(result, self.file) self.file.write(delimiter) super()._save_one_time()
[docs] def _file_exists(self): return self.path_file.endswith(".json") and os.path.exists(self.path_file)
[docs] def load(self): if not os.path.exists(self.path_file): raise FileNotFoundError( f"Spatial means file is missing: {self.path_file}" ) try: df = pd.read_json(self.path_file, orient="records", lines=True) return df except ValueError: with open(self.path_file) as fp: for line_nb, line in enumerate(fp): line_nb += 1 try: json.loads(line) except json.JSONDecodeError: break raise IOError( f"Error reading spatial means file {self.path_file} in\n\tline" f"number {line_nb}:\n\t{line}" )
[docs] def load_dataset(self, dims=("t",)): df = self.load() if isinstance(df, dict): return super().load_dataset(dims) else: df.index = df[dims[0]] return xr.Dataset.from_dataframe(df)
[docs] def compute_time_means(self, tstatio=0.0, tmax=None): """compute the temporal means.""" if not self._file_exists(): return super().compute_time_means(tstatio, tmax) df = self.load() times = df.t imin_mean = abs(times - tstatio).idxmin() imax_mean = None if tmax is None else (abs(times - tmax)).idxmin() + 1 df_mean = df.iloc[imin_mean:imax_mean].mean() return df_mean, df
[docs] def time_first_saved(self) -> float: if not self._file_exists(): self.path_file = self.path_file.replace(".json", ".txt") return super().time_first_saved() with open(self.path_file) as file_means: line = file_means.readline() result = json.loads(line) return result["t"]
[docs] def time_last_saved(self) -> float: if not self._file_exists(): self.path_file = self.path_file.replace(".json", ".txt") return super().time_last_saved() with open(self.path_file, "rb") as file_means: nb_char =, os.SEEK_END) # go to the end nb_char_to_read = min(nb_char, 1000), 2) line = file_means.readline() while line != b"": line_prev = line line = file_means.readline() result = json.loads(line_prev) return result["t"]