Source code for fluidsim.base.output.spatial_means

"""Spatial means
================

Provides:

.. autoclass:: SpatialMeansBase
   :members:
   :private-members:
   :noindex:
   :undoc-members:

.. autoclass:: SpatialMeansJSON
   :members:
   :private-members:
   :noindex:
   :undoc-members:

"""

import os
import numpy as np
import json
from typing import Dict

import pandas as pd
import xarray as xr
from fluiddyn.util import mpi

from .base import SpecificOutput


def inner_prod(a_fft, b_fft):
    return np.real(a_fft.conj() * b_fft)


[docs] class SpatialMeansBase(SpecificOutput): """A :class:`SpatialMean` object handles the saving of . This class uses the particular functions defined by some solvers :func:`` and :func``. If the solver doesn't has these functions, this class does nothing. """ _tag = "spatial_means" _name_file = _tag + ".txt"
[docs] @staticmethod def _complete_params_with_default(params): tag = "spatial_means" params.output.periods_save._set_attrib(tag, 0) params.output._set_child(tag, attribs={"HAS_TO_PLOT_SAVED": False})
def __init__(self, output): params = output.sim.params self.sum_wavenumbers = output.sum_wavenumbers try: self.vecfft_from_rotfft = output.oper.vecfft_from_rotfft except AttributeError: pass super().__init__( output, period_save=params.output.periods_save.spatial_means, has_to_plot_saved=params.output.spatial_means.HAS_TO_PLOT_SAVED, ) if self.period_save != 0: # saved for each initialization to help detecting bugs self._save_one_time() self.t_last_save = self.sim.time_stepping.t
[docs] def _init_files(self, arrays_1st_time=None): if mpi.rank == 0: if not os.path.exists(self.path_file): self.file = open(self.path_file, "w") else: self.file = open(self.path_file, "r+") # to go to the end of the file self.file.seek(0, os.SEEK_END)
[docs] def _online_save(self): self()
def __call__(self): """Save the values at one time.""" tsim = self.sim.time_stepping.t if ( tsim + 1e-15 ) // self.period_save > self.t_last_save // self.period_save: self.t_last_save = self.sim.time_stepping.t self._save_one_time()
[docs] def _save_one_time(self, *args): self.t_last_save = self.sim.time_stepping.t
[docs] def _init_online_plot(self): if mpi.rank == 0: width_axe = 0.85 height_axe = 0.4 x_left_axe = 0.12 z_bottom_ax = 0.55 size_ax = [x_left_axe, z_bottom_ax, width_axe, height_axe] fig, ax = self.output.figure_axe(size_axe=size_ax, numfig=3_000_000) self.ax_a = ax ax.set_xlabel("$t$") ax.set_ylabel("$E(t)$") ax.set_title("mean quantities\n" + self.output.summary_simul) z_bottom_ax = 0.08 size_ax[1] = z_bottom_ax ax = fig.add_axes(size_ax) self.axe_b = ax ax.set_xlabel("$t$") ax.set_ylabel(r"$\epsilon(t)$")
[docs] def _get_nb_points_from_lines(self, lines_t, *liness): nt = len(lines_t) liness = [lines for lines in liness if lines] if all(len(lines) == nt for lines in liness): return nt else: # the last line for a quantity may not have been saved yet return nt - 1
[docs] def load(self): dict_results = {} return dict_results
[docs] def load_dataset(self, dims=("t",)): """Loads results as a xarray dataset.""" dict_results = self.load() # NOTE: format specified in # http://xarray.pydata.org/en/stable/generated/xarray.Dataset.from_dict.html dset = {"coords": {}, "attrs": {}, "dims": dims, "data_vars": {}} for key, value in dict_results.items(): if isinstance(value, np.ndarray): target = "coords" if key in dims else "data_vars" dset[target].update({key: {"dims": dims, "data": value}}) else: dset["attrs"].update({key: value}) return xr.Dataset.from_dict(dset)
[docs] def plot(self): pass
[docs] def compute_time_means(self, tstatio=0.0, tmax=None): """compute the temporal means.""" dict_results = self.load() times = dict_results["t"] imin_mean = np.argmin(abs(times - tstatio)) imax_mean = None if tmax is None else np.argmin(abs(times - tmax)) + 1 dict_time_means = {} for key, value in dict_results.items(): if isinstance(value, np.ndarray): dict_time_means[key] = np.mean(value[imin_mean:imax_mean]) return dict_time_means, dict_results
[docs] def _close_file(self): try: self.file.close() except AttributeError: pass
[docs] def time_first_saved(self) -> float: with open(self.path_file) as file_means: line = "" while not line.startswith("time ="): line = file_means.readline() words = line.split() return float(words[2])
[docs] def time_last_saved(self) -> float: with open(self.path_file, "rb") as file_means: nb_char = file_means.seek(0, os.SEEK_END) # go to the end nb_char_to_read = min(nb_char, 1000) file_means.seek(-nb_char_to_read, 2) line = file_means.readline() while line != b"": if line.startswith(b"time ="): line_time = line line = file_means.readline() words = line_time.split() return float(words[2])
[docs] class SpatialMeansJSON(SpatialMeansBase): """Save and load as line-delimited JSON.""" _tag = "spatial_means" _name_file = _tag + ".json"
[docs] def _save_one_time(self, result: Dict[str, float], delimiter: str = "\n"): if mpi.rank == 0: json.dump(result, self.file) self.file.write(delimiter) super()._save_one_time()
[docs] def _file_exists(self): return self.path_file.endswith(".json") and os.path.exists(self.path_file)
[docs] def load(self): if not os.path.exists(self.path_file): raise FileNotFoundError( f"Spatial means file is missing: {self.path_file}" ) try: df = pd.read_json(self.path_file, orient="records", lines=True) return df except ValueError: with open(self.path_file) as fp: for line_nb, line in enumerate(fp): line_nb += 1 try: json.loads(line) except json.JSONDecodeError: break raise IOError( f"Error reading spatial means file {self.path_file} in\n\tline" f"number {line_nb}:\n\t{line}" )
[docs] def load_dataset(self, dims=("t",)): df = self.load() if isinstance(df, dict): return super().load_dataset(dims) else: df.index = df[dims[0]] return xr.Dataset.from_dataframe(df)
[docs] def compute_time_means(self, tstatio=0.0, tmax=None): """compute the temporal means.""" if not self._file_exists(): return super().compute_time_means(tstatio, tmax) df = self.load() times = df.t imin_mean = abs(times - tstatio).idxmin() imax_mean = None if tmax is None else (abs(times - tmax)).idxmin() + 1 df_mean = df.iloc[imin_mean:imax_mean].mean() return df_mean, df
[docs] def time_first_saved(self) -> float: if not self._file_exists(): self.path_file = self.path_file.replace(".json", ".txt") return super().time_first_saved() with open(self.path_file) as file_means: line = file_means.readline() result = json.loads(line) return result["t"]
[docs] def time_last_saved(self) -> float: if not self._file_exists(): self.path_file = self.path_file.replace(".json", ".txt") return super().time_last_saved() with open(self.path_file, "rb") as file_means: nb_char = file_means.seek(0, os.SEEK_END) # go to the end nb_char_to_read = min(nb_char, 1000) file_means.seek(-nb_char_to_read, 2) line = file_means.readline() while line != b"": line_prev = line line = file_means.readline() result = json.loads(line_prev) return result["t"]