Source code for fluidsim.base.output.spatiotemporal_spectra

Spatiotemporal Spectra


.. autoclass:: SpatioTemporalSpectra3D

.. autoclass:: SpatioTemporalSpectra2D

.. autoclass:: SpatioTemporalSpectraNS

from pathlib import Path
from logging import warn
from math import pi

import numpy as np
from scipy import signal
import h5py
from rich.progress import Progress
from fluidsim.util import ensure_radians

from fluiddyn.util import mpi
from fluidsim.util import open_patient
from fluidsim.base.output.base import SpecificOutput

from transonic import boost, Array, Type

Uf32f64 = Type(np.float32, np.float64)
A = Array[Uf32f64, "1d"]

def find_index_first_geq(arr: A, value: Uf32f64):
    """find the first index such that `arr[index] >= value`"""
    for i, v in enumerate(arr):
        if v >= value:
            return i
    print("arr", arr)
    raise ValueError(f"No index such that `arr[index] >= value (={value:.8g})`")

def find_index_first_g(arr: A, value: Uf32f64):
    """find the first index such that `arr[index] > value`"""
    for i, v in enumerate(arr):
        if v > value:
            return i
    print("arr", arr)
    raise ValueError(f"No index such that `arr[index] >= value (={value:.8g})`")

def find_index_first_l(arr: A, value: Uf32f64):
    """find the first index such that `arr[index] < value`"""
    for i, v in enumerate(arr):
        if v < value:
            return i
    print("arr", arr)
    raise ValueError(f"No index such that `arr[index] >= value (={value:.8g})`")

def filter_tmins_paths(tmin, tmins, paths):
    if tmins.size == 1:
        return tmins, paths
    delta_tmin = np.diff(tmins).min()
    start = find_index_first_l(tmin - tmins, delta_tmin)
    return tmins[start:], paths[start:]

def sort_files_tmin(paths, tmins=None):
    if not isinstance(paths, list):
        paths = list(paths)
    if tmins is None:
        tmins = np.array([float([14:-3]) for p in paths])
    return [
        path for (path, _) in sorted(zip(paths, tmins), key=lambda pair: pair[1])

def get_arange_minmax(times: A, tmin: Uf32f64, tmax: Uf32f64):
    """get a range of index for which `tmin <= times[i] <= tmax`

    This assumes that `times` is sorted.


    if tmin <= times[0]:
        start = 0
        start = find_index_first_geq(times, tmin)

    if tmax >= times[-1]:
        stop = len(times)
        stop = find_index_first_g(times, tmax)

    return np.arange(start, stop)

[docs] class SpatioTemporalSpectra3D(SpecificOutput): """ Computes the spatiotemporal spectra. """ _tag = "spatiotemporal_spectra" nb_dim = 3
[docs] @classmethod def _complete_params_with_default(cls, params): params.output.periods_save._set_attrib(cls._tag, 0) params.output._set_child( cls._tag, attribs={ "probes_region": None, "file_max_size": 10.0, # MB "SAVE_AS_COMPLEX64": True, }, ) params.output.spatiotemporal_spectra._set_doc( """ probes_region: int tuple (default:None) Boundaries of the region to record in the spectral domain. probes_region = (ikxmax, ikymax, ikzmax), in nondimensional units (mode indices). The resulting ranges for each wavevectors are : ikx in [0 , ikxmax] iky in [-ikymax+1 , ikymax] ikz in [-ikzmax+1 , ikzmax] If None, set all ikmax = 4. file_max_size: float (default: 10.0) Maximum size of one time series file, in megabytes. SAVE_AS_COMPLEX64: bool (default: True) If set to False, probes data is saved as complex128. Warning : saving as complex128 reduces digital noise at high frequency, but doubles the size of the output! """ )
def __init__(self, output): params = output.sim.params try: params_st_spec = params.output.spatiotemporal_spectra except AttributeError: warn( "Cannot initialize spatiotemporal spectra output because " "`params` does not contain parameters for this class." ) return super().__init__( output, period_save=params.output.periods_save.spatiotemporal_spectra, ) oper = self.sim.oper # Parameters self.period_save = params.output.periods_save.spatiotemporal_spectra self.path_dir = Path(self.sim.output.path_run) / "spatiotemporal" self.keys_fields = self.sim.info_solver.classes.State.keys_state_phys if not output._has_to_save: self.period_save = 0.0 if self.period_save == 0.0: return if params_st_spec.probes_region is not None: if self.nb_dim == 3: ikxmax, ikymax, ikzmax = params_st_spec.probes_region else: ikxmax, ikymax = params_st_spec.probes_region else: ikxmax = ikymax = 4 if self.nb_dim == 3: ikzmax = 4 ikxmax = min(int(round(ikxmax)), params.oper.nx // 2) ikymax = min(int(round(ikymax)), params.oper.ny // 2) if self.nb_dim == 3: ikzmax = min(int(round(ikzmax)), // 2) self.probes_region = ikxmax, ikymax, ikzmax else: self.probes_region = ikxmax, ikymax self.file_max_size = params_st_spec.file_max_size self.SAVE_AS_COMPLEX64 = params_st_spec.SAVE_AS_COMPLEX64 # region must be int tuple ikxmax = int(ikxmax) ikymax = int(ikymax) if self.nb_dim == 3: ikzmax = int(ikzmax) # dimensions order in Fourier space if self.nb_dim == 3: self.dims_order = np.array(oper.oper_fft.get_dimX_K()) else: self.dims_order = np.arange(2) if oper.oper_fft.get_is_transposed(): self.dims_order = self.dims_order[::-1] # data directory if mpi.rank == 0: self.path_dir.mkdir(exist_ok=True) if mpi.nb_proc > 1: mpi.comm.barrier() # data type and size if self.SAVE_AS_COMPLEX64: self.datatype = np.complex64 size_1_number = 8e-6 else: self.datatype = np.complex128 size_1_number = 16e-6 # check for existing files paths = sort_files_tmin(self.path_dir.glob("rank*.h5")) if paths: # check values in files with open_patient(paths[0], "r") as file: if file.attrs["nb_proc"] != mpi.nb_proc: raise ValueError( f"process number ({mpi.nb_proc}) is different from " f"process number in file ({file.attrs['nb_proc']})" ) if (file.attrs["dims_order"] != self.dims_order).any(): raise ValueError("dimensions order is different from files") if (file.attrs["probes_region"] != self.probes_region).any(): raise ValueError("probes region is different from files") # init from files INIT_FROM_PARAMS = False paths_rank = [ p for p in paths if"rank{mpi.rank:05}") ] if paths_rank: self.path_file = paths_rank[-1] with open_patient(self.path_file, "r") as file: self.index_file = file.attrs["index_file"] self.probes_k0adim_loc = file["probes_k0adim_loc"][:] self.probes_ik0_loc = file["probes_ik0_loc"][:] self.probes_k1adim_loc = file["probes_k1adim_loc"][:] self.probes_ik1_loc = file["probes_ik1_loc"][:] if self.nb_dim == 3: self.probes_k2adim_loc = file["probes_k2adim_loc"][:] self.probes_ik2_loc = file["probes_ik2_loc"][:] self.probes_nb_loc = self.probes_ik0_loc.size self.number_times_in_file = file["times"].size self.t_last_save = file["times"][-1] else: # no probes in proc self.path_file = None self.index_file = 0 self.number_times_in_file = 0 self.probes_nb_loc = 0 self.probes_ik0_loc = [] self.probes_ik1_loc = [] self.probes_ik2_loc = [] with open_patient(paths[-1], "r") as file: self.t_last_save = file["times"][-1] else: # no files were found : initialize from params INIT_FROM_PARAMS = True if self.nb_dim == 3: # pair kx,ky,kz with k0,k1,k2 iksmax = np.array([ikzmax, ikymax, ikxmax]) iksmin = np.array([1 - ikzmax, 1 - ikymax, 0]) ik0max, ik1max, ik2max = iksmax[self.dims_order] ik0min, ik1min, ik2min = iksmin[self.dims_order] # local probes indices ( k0_adim_loc, k1_adim_loc, k2_adim_loc, ) = oper.oper_fft.get_k_adim_loc() K0_adim, K1_adim, K2_adim = np.meshgrid( k0_adim_loc, k1_adim_loc, k2_adim_loc, indexing="ij" ) else: iksmax = np.array([ikymax, ikxmax]) iksmin = np.array([1 - ikymax, 0]) ik0max, ik1max = iksmax[self.dims_order] ik0min, ik1min = iksmin[self.dims_order] kx_adim_loc = np.array( (oper.kx_loc / oper.deltakx).round(), dtype=int ) ky_adim_loc = np.array( (oper.ky_loc / oper.deltaky).round(), dtype=int ) if oper.oper_fft.get_is_transposed(): k0_adim_loc = kx_adim_loc k1_adim_loc = ky_adim_loc else: k0_adim_loc = ky_adim_loc k1_adim_loc = kx_adim_loc K0_adim, K1_adim = np.meshgrid( k0_adim_loc, k1_adim_loc, indexing="ij" ) cond_region = ( (K0_adim >= ik0min) & (K0_adim <= ik0max) & (K1_adim >= ik1min) & (K1_adim <= ik1max) ) if self.nb_dim == 3: cond_region = ( cond_region & (K2_adim >= ik2min) & (K2_adim <= ik2max) ) if self.nb_dim == 3: ( self.probes_ik0_loc, self.probes_ik1_loc, self.probes_ik2_loc, ) = np.where(cond_region) else: ( self.probes_ik0_loc, self.probes_ik1_loc, ) = np.where(cond_region) self.probes_nb_loc = self.probes_ik0_loc.size # local probes wavenumbers (nondimensional) self.probes_k0adim_loc = self._get_data_probe_from_field(K0_adim) self.probes_k1adim_loc = self._get_data_probe_from_field(K1_adim) if self.nb_dim == 3: self.probes_k2adim_loc = self._get_data_probe_from_field(K2_adim) # initialize files info self.index_file = 0 self.number_times_in_file = 0 self.t_last_save = -self.period_save # size of a single write: nb_fields * probes_nb_loc + time probes_write_size = ( len(self.keys_fields) * self.probes_nb_loc + 1 ) * size_1_number self.max_number_times_in_file = int( self.file_max_size / probes_write_size ) # initialize files if INIT_FROM_PARAMS and self.probes_nb_loc > 0: self._init_new_file(tmin_file=self.sim.time_stepping.t)
[docs] def _init_files(self, arrays_1st_time=None): # we don't want to do anything when this function is called. pass
[docs] def _init_new_file(self, tmin_file=None): """Initializes a new file""" if tmin_file is not None: # max number of digits = int(log10(t_end)) + 1 # add .3f precision = 4 additional characters # +2 by anticipation of potential restarts str_width = int(np.log10(self.sim.params.time_stepping.t_end)) + 7 ind_str = f"tmin{tmin_file:0{str_width}.3f}" else: ind_str = f"file{self.index_file:04}" self.path_file = self.path_dir / f"rank{mpi.rank:05}_{ind_str}.h5" with open_patient(self.path_file, "w") as file: file.attrs["nb_proc"] = mpi.nb_proc file.attrs["dims_order"] = self.dims_order file.attrs["index_file"] = self.index_file file.attrs["probes_region"] = self.probes_region file.attrs["period_save"] = self.period_save file.attrs["max_number_times_in_file"] = self.max_number_times_in_file create_ds = file.create_dataset create_ds("probes_k0adim_loc", data=self.probes_k0adim_loc) create_ds("probes_ik0_loc", data=self.probes_ik0_loc) create_ds("probes_k1adim_loc", data=self.probes_k1adim_loc) create_ds("probes_ik1_loc", data=self.probes_ik1_loc) if self.nb_dim == 3: create_ds("probes_k2adim_loc", data=self.probes_k2adim_loc) create_ds("probes_ik2_loc", data=self.probes_ik2_loc) for key in self.keys_fields: create_ds( key + "_Fourier_loc", (self.probes_nb_loc, 1), maxshape=(self.probes_nb_loc, None), dtype=self.datatype, ) create_ds("times", (1,), maxshape=(None,))
[docs] def _write_to_file(self, data): """Writes a file with the temporal data""" with open_patient(self.path_file, "a") as file: for k, v in data.items(): dset = file[k] if k.startswith("times"): dset.resize((self.number_times_in_file,)) dset[-1] = v else: dset.resize((self.probes_nb_loc, self.number_times_in_file)) dset[:, -1] = v
[docs] def _add_probes_data_to_dict(self, data, key): """Probes fields in Fourier space and append data to a dict object""" data[key + "_Fourier_loc"] = self._get_data_probe_from_field( self.sim.state.get_var(f"{key}_fft") )
[docs] def _online_save(self): """Prepares data and writes to file""" tsim = self.sim.time_stepping.t if ( tsim + 1e-15 ) // self.period_save > self.t_last_save // self.period_save: # if max write number is reached, init new file if self.number_times_in_file >= self.max_number_times_in_file: self.index_file += 1 self.number_times_in_file = 0 self._init_new_file(tmin_file=self.sim.time_stepping.t) # get data from probes data = {"times": self.sim.time_stepping.t} for key in self.keys_fields: self._add_probes_data_to_dict(data, key) # write to file self.number_times_in_file += 1 if self.probes_nb_loc > 0: self._write_to_file(data) self.t_last_save = tsim
[docs] def load_time_series(self, keys=None, tmin=0, tmax=None, dtype=None): """load time series from files""" if mpi.nb_proc > 1: raise RuntimeError( "This postprocessing function should not be called with MPI." ) if keys is None: keys = self.keys_fields # get ranks paths = sort_files_tmin(self.path_dir.glob("rank*.h5")) ranks = sorted({int([4:9]) for p in paths}) # get times and dimensions order from the files of first rank print("load times series...") paths_1st_rank = [ p for p in paths if"rank{ranks[0]:05}") ] with open_patient(paths_1st_rank[0], "r") as file: dims_order = file.attrs["dims_order"] region = file.attrs["probes_region"] if dtype is None: dtype = file[keys[0] + "_Fourier_loc"].dtype # get list of useful files, from tmin tmins_files = np.array([float([14:-3]) for p in paths_1st_rank]) tmins_files, paths_1st_rank = filter_tmins_paths( tmin, tmins_files, paths_1st_rank ) paths_1st_rank = sort_files_tmin(paths_1st_rank, tmins_files) tmins_files = sorted(tmins_files) if tmax is None: with open_patient(paths_1st_rank[-1], "r") as file: tmax = file["/times"][-1] with Progress() as progress: npaths = len(paths_1st_rank) task_files = progress.add_task( "Getting times from rank 0...", total=npaths ) times = [] for ip, path in enumerate(paths_1st_rank): with open_patient(path, "r") as file: if tmins_files[ip] > tmax: progress.update(task_files, completed=npaths) break times_file = file["times"][:] cond_times = (times_file >= tmin) & (times_file <= tmax) times.append(times_file[cond_times]) progress.update(task_files, advance=1) times = np.concatenate(times) tmin = times.min() tmax = times.max() print(f"tmin={tmin:8.6g}, tmax={tmax:8.6g}, nit={times.size}") # get sequential shape of Fourier space if self.nb_dim == 3: ikxmax, ikymax, ikzmax = region iksmax = np.array([ikzmax, ikymax, ikxmax]) iksmin = np.array([1 - ikzmax, 1 - ikymax, 0]) ik0max, ik1max, ik2max = iksmax[dims_order] ik0min, ik1min, ik2min = iksmin[dims_order] shape_series = ( ik0max + 1 - ik0min, ik1max + 1 - ik1min, ik2max + 1 - ik2min, times.size, ) else: ikxmax, ikymax = region iksmax = np.array([ikymax, ikxmax]) iksmin = np.array([1 - ikymax, 0]) ik0max, ik1max = iksmax[dims_order] ik0min, ik1min = iksmin[dims_order] shape_series = ( ik0max + 1 - ik0min, ik1max + 1 - ik1min, times.size, ) # load series, rebuild as state_spect arrays + time series = { f"{k}_Fourier": np.empty(shape_series, dtype=dtype) for k in keys } with Progress() as progress: task_ranks = progress.add_task("Rearranging...", total=len(ranks)) task_files = progress.add_task("Rank 00000...", total=npaths) # loop on all files, rank by rank for rank in ranks: paths_rank = [ p for p in paths if"rank{rank:05}") ] # get list of useful files, from tmin tmins_files = np.array([float([14:-3]) for p in paths_rank]) tmins_files, paths_rank = filter_tmins_paths( tmin, tmins_files, paths_rank ) npaths = len(paths_rank) progress.update( task_files, description=f"Rank {rank:05}...", total=npaths, completed=0, ) # for a given rank, paths are sorted by time for ip, path_file in enumerate(paths_rank): # break after the last useful file if tmins_files[ip] > tmax: progress.update(task_files, completed=npaths) break with open_patient(path_file, "r") as file: # time indices times_file = file["times"][:] if times_file[-1] < tmin: progress.update(task_files, advance=1) continue its_file = get_arange_minmax(times_file, tmin, tmax) tmin_keep = times_file[its_file[0]] tmax_keep = times_file[its_file[-1]] its = get_arange_minmax(times, tmin_keep, tmax_keep) # k_adim_loc = global probes indices! ik0 = file["probes_k0adim_loc"][:] ik1 = file["probes_k1adim_loc"][:] if self.nb_dim == 3: ik2 = file["probes_k2adim_loc"][:] # load data at desired times for all keys_fields for key in keys: skey = key + "_Fourier" data = file[skey + "_loc"][:, its_file] if self.nb_dim == 3: for i, it in enumerate(its): series[skey][ik0, ik1, ik2, it] = data[:, i] else: for i, it in enumerate(its): series[skey][ik0, ik1, it] = data[:, i] # update rich task progress.update(task_files, advance=1) # update rich task progress.update(task_ranks, advance=1) # add Ki_adim arrays, times and dims order k0_adim = np.r_[0 : ik0max + 1, ik0min:0] k1_adim = np.r_[0 : ik1max + 1, ik1min:0] if self.nb_dim == 3: k2_adim = np.r_[0 : ik2max + 1, ik2min:0] K0_adim, K1_adim, K2_adim = np.meshgrid( k0_adim, k1_adim, k2_adim, indexing="ij" ) else: K0_adim, K1_adim = np.meshgrid(k0_adim, k1_adim, indexing="ij") series.update( { "K0_adim": K0_adim, "K1_adim": K1_adim, "times": times, "dims_order": dims_order, } ) if self.nb_dim == 3: series["K2_adim"] = K2_adim return series
[docs] def _compute_spectrum(self, data): if not hasattr(self, "f_sample"): paths = sorted(self.path_dir.glob("rank*.h5")) with h5py.File(paths[0], "r") as file: self.f_sample = 1.0 / file.attrs["period_save"] self.domega = 2 * pi * self.f_sample / data.shape[-1] # TODO: I'm not sure if detrend=False is good in prod, but it's much # better for testing freq, spectrum = signal.periodogram( data, fs=self.f_sample, scaling="spectrum", detrend=False, return_onesided=False, ) return freq, spectrum / self.domega
[docs] def compute_spectra(self, tmin=0, tmax=None, dtype=None): """compute spatiotemporal spectra from files""" # load time series as state_spect arrays + times series = self.load_time_series(tmin=tmin, tmax=tmax, dtype=dtype) # compute spectra print("performing time fft...") spectra = {k: v for k, v in series.items() if k.startswith("K")} for key, data in series.items(): if "_Fourier" not in key: continue key_spectrum = "spectrum_" + key.split("_Fourier")[0] freq, spectrum = self._compute_spectrum(data) spectra[key_spectrum] = spectrum spectra["omegas"] = 2 * pi * freq spectra["dims_order"] = series["dims_order"] return spectra
[docs] def _get_data_probe_from_field(self, field): return field[ self.probes_ik0_loc, self.probes_ik1_loc, self.probes_ik2_loc, ]
[docs] class SpatioTemporalSpectra2D(SpatioTemporalSpectra3D): nb_dim = 2
[docs] def _get_data_probe_from_field(self, field): return field[ self.probes_ik0_loc, self.probes_ik1_loc, ]
def _complete_name(name, dtype, save_urud): if dtype is not None: name += f"_{dtype}" if save_urud: name += "_urud" return name + ".h5"
[docs] class SpatioTemporalSpectraNS:
[docs] def _get_path_saved_spectra(self, tmin, tmax, dtype, save_urud): if tmax is None: tmax = self._get_default_tmax() # we first check if a file corresponds to tmin and tmax # but we don't know how tmin/tmax are formatted for_glob = _complete_name("periodogram_*_*", dtype, save_urud) for path in self.path_dir.glob(for_glob): if "_temporal" in continue if (tmin, tmax) == tuple(float(s) for s in path.stem.split("_")[1:3]): return path name = _complete_name( f"periodogram_{float(tmin)}_{float(tmax)}", dtype, save_urud ) return self.path_dir / name
[docs] def _get_path_saved_tspectra(self, tmin, tmax, dtype, save_urud): if tmax is None: tmax = self._get_default_tmax() # we first check if a file corresponds to tmin and tmax # but we don't know how tmin/tmax are formatted for_glob = _complete_name("periodogram_temporal_*_*", dtype, save_urud) for path in self.path_dir.glob(for_glob): if (tmin, tmax) == tuple(float(s) for s in path.stem.split("_")[2:4]): return path name = _complete_name( f"periodogram_temporal_{float(tmin)}_{float(tmax)}", dtype, save_urud ) return self.path_dir / name
[docs] def save_spectra_kzkhomega( self, tmin=0, tmax=None, dtype=None, save_urud=False ): """ save: - the spatiotemporal spectra, with a cylindrical average in k-space - the temporal spectra, with an average on the whole k-space """ if tmax is None: tmax = self._get_default_tmax() # compute spectra print("Computing spectra...") spectra = self.compute_spectra(tmin=tmin, tmax=tmax, dtype=dtype) # get kz, kh params_oper = self.sim.params.oper deltakx = 2 * pi / params_oper.Lx order = spectra["dims_order"] KX = deltakx * spectra[f"K{order[-1]}_adim"] kx_max = self.sim.params.oper.nx // 2 * deltakx if self.nb_dim == 3: deltaky = 2 * pi / params_oper.Ly deltakz = 2 * pi / params_oper.Lz _deltakhs = [deltakx, deltaky] _ideltakh = np.argmax(_deltakhs) deltakh = _deltakhs[_ideltakh] KY = deltaky * spectra[f"K{order[1]}_adim"] KH = np.sqrt(KX**2 + KY**2) khmax_spectra = [KX, KY][_ideltakh].max() del KY else: # in 2d, vertical (here "z") is y deltakz = 2 * pi / params_oper.Ly KH = abs(KX) deltakh = deltakx khmax_spectra = KX.max() KZ = deltakz * spectra[f"K{order[0]}_adim"] kz_spectra = np.arange(0, KZ.max() + 1e-15, deltakz) nkh_spectra = max(2, int(khmax_spectra / deltakh)) kh_spectra = deltakh * np.arange(nkh_spectra) # get one-sided frequencies omegas = spectra["omegas"] nomegas = (omegas.size + 1) // 2 omegas_onesided = abs(omegas[:nomegas]) # kzkhomega : perform cylindrical average # temporal spectra : average on Fourier space spectra_kzkhomega = { "kz_spectra": kz_spectra, "kh_spectra": kh_spectra, "omegas": omegas_onesided, } tspectra = {"omegas": omegas_onesided} for key, data in spectra.items(): if not key.startswith("spectrum_"): continue spectra_kzkhomega[key] = self.compute_spectrum_kzkhomega( np.ascontiguousarray(data), kh_spectra, kz_spectra, KX, KZ, KH ) tspectrum = self._sum_wavenumber(data, KX, kx_max) # one-sided frequencies tspectrum_onesided = np.zeros(nomegas) tspectrum_onesided[0] = tspectrum[0] tspectrum_onesided[1:] = ( tspectrum[1:nomegas] + tspectrum[-1:-nomegas:-1] ) tspectra[key] = tspectrum_onesided del spectra # total kinetic energy if self.nb_dim == 3: spectra_kzkhomega["spectrum_K"] = 0.5 * ( spectra_kzkhomega["spectrum_vx"] + spectra_kzkhomega["spectrum_vy"] + spectra_kzkhomega["spectrum_vz"] ) tspectra["spectrum_K"] = 0.5 * ( tspectra["spectrum_vx"] + tspectra["spectrum_vy"] + tspectra["spectrum_vz"] ) else: spectra_kzkhomega["spectrum_K"] = 0.5 * ( spectra_kzkhomega["spectrum_ux"] + spectra_kzkhomega["spectrum_uy"] ) tspectra["spectrum_K"] = 0.5 * ( tspectra["spectrum_ux"] + tspectra["spectrum_uy"] ) # potential energy try: N = self.sim.params.N spectra_kzkhomega["spectrum_A"] = ( 0.5 / N**2 * spectra_kzkhomega["spectrum_b"] ) tspectra["spectrum_A"] = 0.5 / N**2 * tspectra["spectrum_b"] except AttributeError: pass # save to files path_file = self._get_path_saved_spectra(tmin, tmax, dtype, save_urud) with h5py.File(path_file, "w") as file: file.attrs["tmin"] = tmin file.attrs["tmax"] = tmax for key, val in spectra_kzkhomega.items(): file.create_dataset(key, data=val) path_file = self._get_path_saved_tspectra(tmin, tmax, dtype, save_urud) with h5py.File(path_file, "w") as file: file.attrs["tmin"] = tmin file.attrs["tmax"] = tmax for key, val in tspectra.items(): file.create_dataset(key, data=val) # toroidal/poloidal decomposition if save_urud: print("Computing ur, ud spectra...") spectra_urud_kzkhomega = {} tspectra_urud = {} spectra = self.compute_spectra_urud(tmin=tmin, tmax=tmax, dtype=dtype) for key, data in spectra.items(): if not key.startswith("spectrum_"): continue spectra_urud_kzkhomega[key] = self.compute_spectrum_kzkhomega( np.ascontiguousarray(data), kh_spectra, kz_spectra, KX, KZ, KH ) spectra_kzkhomega[key] = spectra_urud_kzkhomega[key] tspectrum = self._sum_wavenumber(data, KX, kx_max) # one-sided frequencies tspectrum_onesided = np.zeros(nomegas) tspectrum_onesided[0] = tspectrum[0] tspectrum_onesided[1:] = ( tspectrum[1:nomegas] + tspectrum[-1:-nomegas:-1] ) tspectra_urud[key] = tspectrum_onesided tspectra[key] = tspectra_urud[key] path_file = self._get_path_saved_spectra(tmin, tmax, dtype, save_urud) with h5py.File(path_file, "a") as file: for key, val in spectra_urud_kzkhomega.items(): file.create_dataset(key, data=val) path_file = self._get_path_saved_tspectra( tmin, tmax, dtype, save_urud ) with h5py.File(path_file, "a") as file: for key, val in tspectra_urud.items(): file.create_dataset(key, data=val) return spectra_kzkhomega, tspectra
[docs] def load_spectra_kzkhomega( self, tmin=0, tmax=None, dtype=None, save_urud=False ): """load kzkhomega spectra from file""" spectra = {} path_file = self._get_path_saved_spectra(tmin, tmax, dtype, save_urud) with h5py.File(path_file, "r") as file: for key in file.keys(): spectra[key] = file[key][...] return spectra
[docs] def compute_omega_emp_vs_kzkh( self, spectra_kzkhomega, key_spect="spectrum_b", ): r"""Compute empirical frequency and fluctuation from the spatiotemporal spectra: .. math:: \omega_{emp}(k_h, k_z) = \frac{\int ~ \omega ~ S(k_h, k_z, \omega) ~ \mathrm{d}\omega}{\int ~ S(k_h, k_z, \omega) ~ \mathrm{d}\omega}, \delta \omega_{emp}(k_h, k_z) = \sqrt{\frac{\int ~ (\omega - \omega_{emp})^2 ~ S(k_h, k_z, \omega) ~ \mathrm{d}\omega}{\int ~ S(k_h, k_z, \omega) ~ \mathrm{d}\omega}}, where :math:`\omega_{emp}` is the empirical frequency and :math:`\delta \omega_{emp}` is the empirical frequency fluctuation. :math:`S(k_h, k_z, \omega)` is the spectra of `key_spect`. """ spectrum = spectra_kzkhomega[key_spect] kh_spectra = spectra_kzkhomega["kh_spectra"] kz_spectra = spectra_kzkhomega["kz_spectra"] omegas = spectra_kzkhomega["omegas"] # khv, kzv = np.meshgrid(kh_spectra, kz_spectra) omega_emp = np.zeros((len(kz_spectra), len(kh_spectra))) delta_omega_emp = np.zeros((len(kz_spectra), len(kh_spectra))) omega_norm = np.zeros((len(kz_spectra), len(kh_spectra))) # we compute omega_emp first for io in range(len(omegas)): omega_emp += omegas[io] * spectrum[:, :, io] omega_norm += spectrum[:, :, io] omega_emp = omega_emp / omega_norm # then we conpute delta_omega_emp for io in range(len(omegas)): delta_omega_emp += ((omegas[io] - omega_emp) ** 2) * spectrum[ :, :, io ] delta_omega_emp = (np.divide(delta_omega_emp, omega_norm)) ** 0.5 return omega_emp, delta_omega_emp
[docs] def plot_kzkhomega( self, key_field="b", tmin=0, tmax=None, dtype=None, equation=None, xmax=None, ymax=None, cmap=None, vmin=None, vmax=None, plot_omega_emp=False, linscale=False, ): """plot the spatiotemporal spectra, with a cylindrical average in k-space equation must start with 'omega=', 'kh=', 'kz=', 'ikh=' or 'ikz='. For 3d solvers, `key_field` can be in `State.keys_state_phys + ["Khd", "Khr", "Kp"]`. """ keys_plot = self.keys_fields.copy() if self.nb_dim == 3: keys_plot.extend(["Khd", "Khr", "Kp"]) if key_field is None: key_field = keys_plot[0] if key_field not in keys_plot: raise KeyError(f"possible keys are {keys_plot}") if tmax is None: tmax = self._get_default_tmax() key_spect = "spectrum_" + key_field if key_field.startswith("Kh") or key_field.startswith("Kp"): save_urud = True else: save_urud = False path_file = self._get_path_saved_spectra(tmin, tmax, dtype, save_urud) path_urud = self._get_path_saved_spectra(tmin, tmax, dtype, True) if path_urud.exists() and not path_file.exists(): path_file = path_urud # compute and save spectra if needed if not path_file.exists(): if self.nb_dim == 3: self.save_spectra_kzkhomega( tmin=tmin, tmax=tmax, dtype=dtype, save_urud=save_urud ) else: self.save_spectra_kzkhomega(tmin=tmin, tmax=tmax, dtype=dtype) # load spectrum spectra_kzkhomega = {} with h5py.File(path_file, "r") as file: if key_spect.startswith("spectrum_Kp"): spectrum = file["spectrum_Khd"][:] + 0.5 * file["spectrum_vz"][:] else: spectrum = file[key_spect][...] if dtype == "complex64": float_dtype = "float32" elif dtype == "complex128": float_dtype = "float64" if dtype: spectrum = spectrum.astype(float_dtype) spectra_kzkhomega[key_spect] = spectrum spectra_kzkhomega["kh_spectra"] = file["kh_spectra"][...] spectra_kzkhomega["kz_spectra"] = file["kz_spectra"][...] spectra_kzkhomega["omegas"] = file["omegas"][...] # compute omega_emp if asked if plot_omega_emp: omega_emp, delta_omega_emp = self.compute_omega_emp_vs_kzkh( spectra_kzkhomega=spectra_kzkhomega, key_spect=key_spect ) # slice along equation if equation is None: equation = f"omega=0" elif equation.startswith("kh="): kh = eval(equation[len("kh=") :]) kh_spectra = spectra_kzkhomega["kh_spectra"] ikh = abs(kh_spectra - kh).argmin() equation = f"ikh={ikh}" elif equation.startswith("kz="): kz = eval(equation[len("kz=") :]) kz_spectra = spectra_kzkhomega["kz_spectra"] ikz = abs(kz_spectra - kz).argmin() equation = f"ikz={ikz}" if equation.startswith("omega="): try: variables = dict(N=self.sim.params.N) except AttributeError: variables = dict() omega = eval(equation[len("omega=") :], variables) omegas = spectra_kzkhomega["omegas"] iomega = abs(omegas - omega).argmin() spect = spectra_kzkhomega[key_spect][:, :, iomega] xaxis = np.arange(spectra_kzkhomega["kh_spectra"].size) yaxis = np.arange(spectra_kzkhomega["kz_spectra"].size) xlabel = r"$k_h/\delta k_h$" ylabel = r"$k_z/\delta k_z$" omega = omegas[iomega] equation = r"$\omega=$" + f"{omega:.2g}" # use reduced frequency for stratified fluids try: N = self.sim.params.N equation = r"$\omega/N=$" + f"{omega/N:.2g}" except AttributeError: pass elif equation.startswith("ikh="): ikh = eval(equation[len("ikh=") :]) kh_spectra = spectra_kzkhomega["kh_spectra"] spect = spectra_kzkhomega[key_spect][:, ikh, :].transpose() if plot_omega_emp: omega_emp = omega_emp[:, ikh] delta_omega_emp = delta_omega_emp[:, ikh] xaxis = np.arange(spectra_kzkhomega["kz_spectra"].size) yaxis = spectra_kzkhomega["omegas"] # use reduced frequency for stratified fluids try: N = self.sim.params.N yaxis /= N except AttributeError: pass xlabel = r"$k_z/\delta k_z$" ylabel = r"$\omega/N$" kh = kh_spectra[ikh] equation = f"$k_h = {ikh}\\delta k_h = {kh:.2g}$" elif equation.startswith("ikz="): ikz = eval(equation[len("ikz=") :]) kz_spectra = spectra_kzkhomega["kz_spectra"] spect = spectra_kzkhomega[key_spect][ikz, :, :].transpose() if plot_omega_emp: omega_emp = omega_emp[ikz, :] delta_omega_emp = delta_omega_emp[ikz, :] xaxis = np.arange(spectra_kzkhomega["kh_spectra"].size) yaxis = spectra_kzkhomega["omegas"] # use reduced frequency for stratified fluids try: N = self.sim.params.N yaxis /= N except AttributeError: pass xlabel = r"$k_h/\delta k_h$" ylabel = r"$\omega/N$" kz = kz_spectra[ikz] equation = f"$k_z = {ikz}\\delta k_z = {kz:.2g}$" else: raise NotImplementedError( "equation must start with 'omega=', 'kh=', 'kz=', 'ikh=' or 'ikz='" ) fig, ax = self.output.figure_axe() ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) # no log(0) spect += 1e-15 if not linscale: spect_to_plot = np.log10(spect) else: spect_to_plot = spect im = ax.pcolormesh( xaxis, yaxis, spect_to_plot, cmap=cmap, vmin=vmin, vmax=vmax, shading="nearest", ) fig.colorbar(im) ax.set_title( f"{key_field} spatiotemporal spectra {equation}\n" f"tmin={tmin:.3f}, tmax={tmax:.3f}\n" + self.output.summary_simul ) # add dispersion relation : omega = N * kh / sqrt(kh ** 2 + kz ** 2) try: N = self.sim.params.N except AttributeError: return dkz_over_dkh = ( spectra_kzkhomega["kz_spectra"][1] / spectra_kzkhomega["kh_spectra"][1] ) if equation.startswith(r"$\omega"): if omega > 0 and omega <= N: ikz_disp = np.sqrt(N**2 / omega**2 - 1) / dkz_over_dkh * xaxis ax.plot(xaxis, ikz_disp, "k-", linewidth=2) elif equation.startswith(r"$k_h"): omega_disp = ikh / np.sqrt(ikh**2 + dkz_over_dkh**2 * xaxis**2) ax.plot(xaxis, omega_disp, "k-", linewidth=2) elif equation.startswith(r"$k_z"): omega_disp = xaxis / np.sqrt(xaxis**2 + dkz_over_dkh**2 * ikz**2) ax.plot(xaxis, omega_disp, "k-", linewidth=2) else: raise ValueError("wrong equation for dispersion relation") # set axis limits after plotting dispersion relation if xmax is None: xmax = xaxis.max() if ymax is None: ymax = yaxis.max() ax.set_xlim((0, xmax)) ax.set_ylim((0, ymax)) # add empirical omega and broadening if plot_omega_emp: ax.plot(xaxis, omega_emp / N, "r-", linewidth=2) ax.plot( xaxis, (omega_emp + 0.5 * delta_omega_emp) / N, "r--", linewidth=1 ) ax.plot( xaxis, (omega_emp - 0.5 * delta_omega_emp) / N, "r--", linewidth=1 ) return omega_emp, delta_omega_emp, omega_disp
[docs] def compute_spectra_urud(self, tmin=0, tmax=None, dtype=None): raise NotImplementedError
[docs] def compute_temporal_spectra( self, tmin=0, tmax=None, dtype=None, compute_urud=False ): """compute the temporal spectra by averaging over Fourier space""" tspectra = {} # compute kxkykzomega spectra spectra = self.compute_spectra(tmin=tmin, tmax=tmax, dtype=dtype) if compute_urud: spectra.update( self.compute_spectra_urud(tmin=tmin, tmax=tmax, dtype=dtype) ) # one-sided frequencies nomegas = (spectra["omegas"].size + 1) // 2 tspectra["omegas"] = spectra["omegas"][:nomegas] order = spectra["dims_order"] KX = spectra[f"K{order[-1]}_adim"] deltakx = 2 * pi / self.sim.params.oper.Lx kx_max = self.sim.params.oper.nx // 2 * deltakx # average over Fourier space (kx,ky,kz) for key, spectrum in spectra.items(): if not key.startswith("spectrum_"): continue tspectrum = self._sum_wavenumber(spectrum, KX, kx_max) # one-sided frequencies tspectrum_onesided = np.zeros(nomegas) tspectrum_onesided[0] = tspectrum[0] tspectrum_onesided[1:] = ( tspectrum[1:nomegas] + tspectrum[-1:-nomegas:-1] ) tspectra[key] = tspectrum_onesided # total kinetic energy if self.nb_dim == 3: tspectra["spectrum_K"] = 0.5 * ( tspectra["spectrum_vx"] + tspectra["spectrum_vy"] + tspectra["spectrum_vz"] ) else: tspectra["spectrum_K"] = 0.5 * ( tspectra["spectrum_ux"] + tspectra["spectrum_uy"] ) # potential energy try: N = self.sim.params.N tspectra["spectrum_A"] = 0.5 / N**2 * tspectra["spectrum_b"] except AttributeError: pass return tspectra
[docs] def _get_default_tmin_periodogram(self, tmin): paths_periodo = sorted(self.path_dir.glob("periodogram_*.h5")) if not paths_periodo: return tmin return float(paths_periodo[-1].name.split("_")[2])
[docs] def plot_temporal_spectra( self, key_field=None, tmin=None, tmax=None, xlim=None, ylim=None, dtype=None, xscale="log", coef_compensate=0, plot_resonant_modes=True, ): """plot the temporal spectra computed from the 4d spectra""" keys_plot = self.keys_fields.copy() if self.nb_dim == 3: keys_plot.extend(["Khd", "Khr", "Kp"]) if key_field is None: key_field = keys_plot[0] if key_field not in keys_plot: raise KeyError(f"possible keys are {keys_plot}") if tmax is None: tmax = self._get_default_tmax() if self.nb_dim == 3: # much simpler for 3d save_urud = True else: save_urud = False if tmin is None: tmin = self._get_default_tmin_periodogram(tmin) path_file = self._get_path_saved_tspectra(tmin, tmax, dtype, save_urud) if path_file.exists(): tspectra = self.load_temporal_spectra( tmin=tmin, tmax=tmax, dtype=dtype, save_urud=save_urud ) else: tspectra = self.save_temporal_spectra( tmin=tmin, tmax=tmax, dtype=dtype, save_urud=save_urud ) omegas = tspectra["omegas"] ylabel = "spectrum" if coef_compensate == 0: norm = 1.0 else: omegas_no_0 = omegas.copy() omegas_no_0[0] = 1e-15 norm = omegas_no_0**-coef_compensate norm[0] = np.nan ylabel = rf"$E(\omega) \omega^{{{coef_compensate}}}$" fig, ax = self.output.figure_axe() ax.set_xlabel(r"$\omega$") ax.set_ylabel(ylabel) ax.set_title( f"{key_field} temporal spectrum (tmin={tmin:.3f}, tmax={tmax:.3f})\n" + self.output.summary_simul ) ax.set_xscale(xscale) ax.set_yscale("log") # specific to strat try: N = self.sim.params.N except AttributeError: ax.plot( omegas, tspectra["spectrum_" + key_field] / norm, "k", linewidth=2, ) else: omegas = omegas / N if self.nb_dim == 3: # polo/toro/potential decomposition EKp = tspectra["spectrum_Khd"] + 0.5 * tspectra["spectrum_vz"] EKhr = tspectra["spectrum_Khr"] EK = EKhr + EKp ax.plot(omegas, EK / norm, "r", linewidth=2, label=r"$E_K$") try: projection = self.sim.params.projection except AttributeError: projection = None if projection != "poloidal": ax.plot( omegas, EKhr / norm, "r--", linewidth=1, label=r"$E_{K,toro}$", ) if projection != "toroidal": ax.plot( omegas, EKp / norm, "r-.", linewidth=1, label=r"$E_{K,polo}$", ) else: # kinetic energy EK = tspectra["spectrum_K"] ax.plot(omegas, EK / norm, "r", linewidth=2, label=r"$E_K$") EK_N = (EK / norm)[abs(omegas - 1).argmin()] # value at N EA = tspectra["spectrum_A"] ax.plot(omegas, EA / norm, "b", linewidth=2, label=r"$E_A$") ax.set_title( f"kinetic/potential energy spectrum (tmin={tmin:.3f}, tmax={tmax:.3f})\n" + self.output.summary_simul ) if plot_resonant_modes: if self.nb_dim == 3: aspect_ratio = self.sim.oper.Lx / self.sim.oper.Lz else: aspect_ratio = self.sim.oper.Lx / self.sim.oper.Ly def modes(nx, nz): return np.sqrt(nx**2 / (nx**2 + aspect_ratio**2 * nz**2)) nxs = np.arange(1, 11) modes_nz1 = modes(nxs, 1) modes_nz2 = modes(nxs, 2) modes_y = np.full_like(modes_nz1, fill_value=10 * EK_N) ax.plot(modes_nz1, modes_y, "o", label="modes $n_z=1$") ax.plot(modes_nz2, modes_y * 3, "o", label="modes $n_z=2$") # omega^-2 scaling omegas_scaling = np.arange(0.4, 1 + 1e-15, 0.01) scaling_y = EK_N * omegas_scaling ** (-2 + coef_compensate) ax.plot( omegas_scaling, scaling_y, "k--", label=r"$\propto \omega^{-2}$" ) # eye guide at N ax.axvline(1, linestyle="dotted") if xlim is not None: ax.set_xlim(xlim) if ylim is not None: ax.set_ylim(ylim) # eye guide at omega_f (specific to some forcing types) forcing_type = self.sim.params.forcing.type if forcing_type in ["watu_coriolis", "milestone"]: if forcing_type == "watu_coriolis": omega_f = self.sim.params.forcing.watu_coriolis.omega_f elif forcing_type == "milestone": period = self.sim.forcing.get_info()["period"] omega_f = 2 * pi / period ax.axvline(omega_f / N, linestyle="dotted") elif forcing_type == "tcrandom_anisotropic": ymin, ymax = ax.get_ybound() factor = 2 angle = ensure_radians( self.params.forcing.tcrandom_anisotropic.angle ) tmp = self.params.forcing.tcrandom_anisotropic try: delta_angle = tmp.delta_angle except AttributeError: # loading old simul with delta_angle delta_angle = None if delta_angle is None: omega_f = N * np.sin(angle) ax.axvline(omega_f / N, linestyle="dotted") omega_tmp = omega_f / N else: delta_angle = ensure_radians(delta_angle) omega_fmin = N * np.sin(angle - 0.5 * delta_angle) omega_fmax = N * np.sin(angle + 0.5 * delta_angle) omegas_f = N * np.logspace(-3, 3, 1000) where = (omegas_f > omega_fmin) & (omegas_f < omega_fmax) ax.fill_between( omegas_f / N, ymin, ymax, where=where, alpha=0.5 ) omega_tmp = 0.5 * (omega_fmin + omega_fmax) / N ax.text( omega_tmp, factor * ymin, r"$\omega_{f}/N$", ha="center", va="center", size=10, ) ax.set_xlabel(r"$\omega/N$") ax.legend()
[docs] def load_temporal_spectra( self, tmin=None, tmax=None, dtype=None, save_urud=False ): """load temporal spectra from file""" tspectra = {} if tmin is None: tmin = self._get_default_tmin_periodogram(tmin) path_file = self._get_path_saved_tspectra(tmin, tmax, dtype, save_urud) with h5py.File(path_file, "r") as file: for key in file.keys(): tspectra[key] = file[key][...] return tspectra
[docs] def save_temporal_spectra( self, tmin=0, tmax=None, dtype=None, save_urud=False ): """compute temporal spectra from files""" if tmax is None: tmax = self._get_default_tmax() tspectra = self.compute_temporal_spectra( tmin=tmin, tmax=tmax, dtype=dtype, compute_urud=save_urud ) path_file = self._get_path_saved_tspectra(tmin, tmax, dtype, save_urud) with h5py.File(path_file, "w") as file: file.attrs["tmin"] = tmin file.attrs["tmax"] = tmax for key, val in tspectra.items(): file.create_dataset(key, data=val) return tspectra
[docs] def _get_default_tmax(self): paths = list(self.path_dir.glob("rank*.h5")) if not paths: paths_periodo = list(self.path_dir.glob("periodogram_*.h5")) if paths_periodo: tmax = 0.0 for path in paths_periodo: with open_patient(path, "r") as file: tmax = max(tmax, file.attrs["tmax"]) return tmax return self.sim.params.time_stepping.t_end ranks = sorted({int([4:9]) for p in paths}) paths_1st_rank = sort_files_tmin( p for p in paths if"rank{ranks[0]:05}") ) with open_patient(paths_1st_rank[-1], "r") as file: return file["/times"][-1]
[docs] def get_spectra(self, tmin=0, tmax=None, dtype=None): save_urud = True path_file = self._get_path_saved_spectra(tmin, tmax, dtype, save_urud) if not path_file.exists(): if self.nb_dim == 3: return self.save_spectra_kzkhomega( tmin=tmin, tmax=tmax, dtype=dtype, save_urud=save_urud ) else: return self.save_spectra_kzkhomega( tmin=tmin, tmax=tmax, dtype=dtype ) spectra_kzkhomega = self.load_spectra_kzkhomega( tmin, tmax, dtype, save_urud ) tspectra = self.load_temporal_spectra(tmin, tmax, dtype, save_urud) return spectra_kzkhomega, tspectra