Source code for fluidsim.base.output.kolmo_law

"""Kolmogorov law 3d (:mod:`fluidsim.base.output.kolmo_law`)
==============================================================

Provides:

.. autoclass:: KolmoLaw
   :members:
   :private-members:

"""

import os
import itertools

import numpy as np
import h5py
import matplotlib.pyplot as plt
from warnings import warn


from fluiddyn.util import mpi

from fluidsim.base.output.base import SpecificOutput
from fluidsim.operators.coord_system3d import CoordSystem3DConverter
from fluidsim.operators.spatial_average3d import SpatialAverage


[docs] class KolmoLaw(SpecificOutput): r"""Kolmogorov law 3d. .. |J| mathmacro:: {\mathbf J} .. |Jk| mathmacro:: {\mathbf J_K} .. |Jp| mathmacro:: {\mathbf J_P} .. |v| mathmacro:: {\mathbf v} .. |x| mathmacro:: {\mathbf x} .. |r| mathmacro:: {\mathbf r} .. |Sum| mathmacro:: \sum_{\mathbf k} .. |bnabla| mathmacro:: \boldsymbol{\nabla} .. |Int| mathmacro:: \displaystyle\int .. |epsK| mathmacro:: \varepsilon_K .. |epsA| mathmacro:: \varepsilon_A .. |dd| mathmacro:: \mathrm{d} We want to test the prediction : .. math:: \bnabla \cdot \left( \Jk + \Jp \right) = -4 \left( \epsK + \epsA \right), where .. math:: \Jk(\r) \equiv \left\langle | \delta \v |^2 \delta \v \right\rangle_\x, \\ \Jp(\r) \equiv \frac{1}{N^2} \left\langle | \delta b |^2 \delta \v \right\rangle_\x. This output computes and saves the components of the vectors :math:`\J_\alpha` averaged over the azimuthal angle (i.e. as a function of :math:`r_h` and :math:`r_v`), as well as radially averaged (as a function of :math:`r`). """ _tag = "kolmo_law" _name_file = _tag + ".h5" @classmethod def _complete_params_with_default(cls, params): params.output.periods_save._set_attrib(cls._tag, 0) def __init__(self, output): params = output.sim.params try: period_save = params.output.periods_save.kolmo_law except AttributeError: period_save = 0.0 if params.ONLY_COARSE_OPER: self.coord_conv = None self.spatial_avg = None period_save = 0.0 if period_save == 0.0: super().__init__(output, period_save=0, arrays_1st_time=None) return # Get local coordinates X, Y, Z = output.sim.oper.get_XYZ_loc() Lx, Ly, Lz = params.oper.Lx, params.oper.Ly, params.oper.Lz # Initialize coordinate converter and spatial average operators self.coord_conv = CoordSystem3DConverter( X, Y, Z, Lx, Ly, Lz, shift_origin=True ) ratio_dr_to_dx = 1.0 ratio_drh_to_dx = 1.0 ratio_drv_to_dx = 1.0 self.spatial_avg = SpatialAverage( output.sim.oper, dr=ratio_dr_to_dx, drh=ratio_drh_to_dx, dz=ratio_drv_to_dx, shift_origin=True, ) # Store bin centers for saving arrays_1st_time = { "r_store": self.spatial_avg.r_centers, "rh_store": self.spatial_avg.rho_centers, "rv_store": self.spatial_avg.z_centers, } super().__init__( output, period_save=period_save, arrays_1st_time=arrays_1st_time, ) def _init_files(self, arrays_1st_time=None): if self.spatial_avg is None and self.coord_conv is None: return result = self.compute() if mpi.rank == 0: if not os.path.exists(self.path_file): self._create_file_from_dict_arrays( self.path_file, result, arrays_1st_time ) self.nb_saved_times = 1 else: with h5py.File(self.path_file, "r") as file: dset_times = file["times"] self.nb_saved_times = dset_times.shape[0] + 1 self._add_dict_arrays_to_file(self.path_file, result) self.t_last_save = self.sim.time_stepping.t
[docs] def _online_save(self): """Save the values at one time.""" if self.spatial_avg is None and self.coord_conv is None: return tsim = self.sim.time_stepping.t if tsim - self.t_last_save >= self.period_save: self.t_last_save = tsim result = self.compute() if mpi.rank == 0: self._add_dict_arrays_to_file(self.path_file, result) self.nb_saved_times += 1
[docs] def compute(self): """Compute the Kolmogorov law quantities at one time.""" state = self.sim.state params = self.sim.params state_phys = state.state_phys state_spect = state.state_spect keys_state_phys = state.keys_state_phys fft = self.sim.oper.fft kx = self.sim.oper.Kx ky = self.sim.oper.Ky kz = self.sim.oper.Kz # Get velocity fields letters = "xyz" fft_vi = [state_spect.get_var(f"v{letter}_fft") for letter in letters] vel = [state_phys.get_var(f"v{letter}") for letter in letters] # Compute kinetic energy K = sum(v**2 for v in vel) fft_K = fft(K) # Compute cross products v_i * v_j fft_vjvi = np.empty((3, 3), dtype=object) for ind_i, ind_j in itertools.product(range(3), repeat=2): vi = vel[ind_i] vj = vel[ind_j] fft_vjvi[ind_i, ind_j] = fft(vi * vj) # Compute mean kinetic energy if "b" in keys_state_phys: nrj_tot_A, nrj_tot_Kz, nrj_tot_Khr, nrj_tot_Khd = ( self.output.compute_energies() ) E_k_mean = nrj_tot_Kz + nrj_tot_Khr + nrj_tot_Khd else: E_k_mean = self.output.compute_energy() # Compute J_k in Fourier space Jk_r_fft = [None] * 3 for ind_i in range(3): tmp = 2 * fft_vi[ind_i] * fft_K.conj() for ind_j in range(3): tmp += 4 * fft_vi[ind_j] * fft_vjvi[ind_i, ind_j].conj() tmp = 1j * tmp.imag Jk_r_fft[ind_i] = tmp # Compute divergence of J_k Jk_r_fft_array = np.array(Jk_r_fft) divJk_fft = 1j * ( kx * Jk_r_fft_array[0] + ky * Jk_r_fft_array[1] + kz * Jk_r_fft_array[2] ) divJk = self.sim.oper.ifft(divJk_fft) # Convert to real space Jk_r = [self.sim.oper.ifft(Jk_r_fft[i]) for i in range(3)] # Compute second-order structure function val = sum(fft_vi[i] * fft_vi[i].conj() for i in range(3)) S2_k_r = 4 * E_k_mean - 2 * self.sim.oper.ifft(val) # If buoyancy field exists, compute J_p if "b" in keys_state_phys: b = state_phys.get_var("b") fft_b = state_spect.get_var("b_fft") b2 = b * b fft_b2 = fft(b2) # Compute mean buoyancy variance E_b_mean = nrj_tot_A * params.N**2 # Compute J_p Jp_r_fft = [None] * 3 fft_bv = [fft(b * vel[i]) for i in range(3)] for ind_i in range(3): mom = ( 4 * fft_bv[ind_i].conj() * fft_b + 2 * fft_b2.conj() * fft_vi[ind_i] ) mom = 1j * mom.imag Jp_r_fft[ind_i] = mom / (params.N**2) # Divergence of J_p Jp_r_fft_array = np.array(Jp_r_fft) divJp_fft = 1j * ( kx * Jp_r_fft_array[0] + ky * Jp_r_fft_array[1] + kz * Jp_r_fft_array[2] ) divJp = self.sim.oper.ifft(divJp_fft) # Convert to real space Jp_r = [self.sim.oper.ifft(Jp_r_fft[i]) for i in range(3)] # S2_p src = fft_b * fft_b.conj() S2_p_r = (4 * E_b_mean - 2 * self.sim.oper.ifft(src)) / (params.N**2) # Project onto coordinate system bases using CoordSystem3DConverter Jk_r_array = np.array(Jk_r) Jl_k = self.coord_conv.compute_radial_component( Jk_r_array[0], Jk_r_array[1], Jk_r_array[2] ) Jh_k, Jt_k, Jv_k = self.coord_conv.compute_cylindrical_components( Jk_r_array[0], Jk_r_array[1], Jk_r_array[2] ) # Azimuthal and radial averages results = { "Jl_k": Jl_k, "Jh_k": Jh_k, "Jv_k": Jv_k, "S2_k": S2_k_r, "divJ_k": divJk, } if "b" in keys_state_phys: Jp_r_array = np.array(Jp_r) Jl_p = self.coord_conv.compute_radial_component( Jp_r_array[0], Jp_r_array[1], Jp_r_array[2] ) Jh_p, Jt_p, Jv_p = self.coord_conv.compute_cylindrical_components( Jp_r_array[0], Jp_r_array[1], Jp_r_array[2] ) results.update( { "Jl_p": Jl_p, "Jh_p": Jh_p, "Jv_p": Jv_p, "S2_p": S2_p_r, "divJ_p": divJp, } ) # Compute radial and azimuthal averages using SpatialAverage averaged_results = {} for key, field in results.items(): _, avg_r = self.spatial_avg.compute_radial_average(field) averaged_results[f"{key}_r"] = avg_r _, _, avg_hv = self.spatial_avg.compute_azimuthal_average(field) averaged_results[f"{key}_hv"] = avg_hv return averaged_results
[docs] def load_temp_average(self, keys=None, tmin=None, tmax=None): """Load selected data and time average.""" results = {} with h5py.File(self.path_file, "r") as file: times = file["times"][...] if keys is None: keys = [ k for k in file.keys() if not any( k.startswith(begin) for begin in ["r", "info_", "times"] ) ] # Determine time range if tmax is None: tmax = times.max() imax_plot = np.argmax(times) else: imax_plot = np.argmin(abs(times - tmax)) tmax = times[imax_plot] if tmin is None: tmin = times.min() imin_plot = np.argmin(times) else: imin_plot = np.argmin(abs(times - tmin)) if imin_plot == imax_plot: if imin_plot == 0: imax_plot += 1 tmax = times[imax_plot] else: imin_plot -= 1 tmin = times[imin_plot] # Load and average data for key in keys: results[key] = np.mean(file[key][imin_plot:imax_plot], axis=0) return results, tmin, tmax
[docs] def plot_radial_dependencies( self, tmin=None, tmax=None, coef_comp3=1, coef_comp2=2 / 3, save=False, ): """Plot radial dependencies of Kolmogorov law quantities.""" self._raise_parallel_error("plot_radial_dependencies") state = self.sim.state params = self.sim.params keys_state_phys = state.keys_state_phys keys = [ "S2_k_r", "divJ_k_r", "Jl_k_r", ] if "b" in keys_state_phys: keys.extend(["S2_p_r", "divJ_p_r", "Jl_p_r"]) to_plot, tmin, tmax = self.load_temp_average(keys, tmin, tmax) dimless_num = self.sim.output.spatial_means.get_dimless_numbers_averaged( tmin=tmin, tmax=tmax )["dimensional"] try: eta = dimless_num["eta"] except KeyError: warn("KeyError: 'eta' not available; eta is set to unity") eta = 1 EK = dimless_num["EKh"] + dimless_num["EKz"] with h5py.File(self.path_file, "r") as file: r_store = np.array(file["r_store"]) # Compensated plots Jl_k_comp = -to_plot["Jl_k_r"] / (r_store**coef_comp3) divJ_k = to_plot["divJ_k_r"] S2_k_comp = to_plot["S2_k_r"] / (r_store**coef_comp2) # Theoretical values Jl_k_th = 4 / 3 * np.ones_like(r_store) S2_k_th = 22 / 3 * np.ones_like(r_store) EK_array = EK * np.ones_like(r_store) if "b" in keys_state_phys: Jl_p_comp = -to_plot["Jl_p_r"] / (r_store**coef_comp3) divJ_p = to_plot["divJ_p_r"] S2_p_comp = to_plot["S2_p_r"] / (r_store**coef_comp2) title = f"$n_x={params.oper.nx}$" # Plot J_KL fig1, ax1 = self.output.figure_axe() ax1.set_ylabel(r"$-J_{KL}(r)/r\epsilon$", fontsize="x-large") ax1.plot(r_store[1:] / eta, Jl_k_comp[1:], "b", label="Numerical result") if "b" in keys_state_phys: ax1.plot(r_store[1:] / eta, Jl_p_comp[1:], "g", label="$J_P$") ax1.plot(r_store[1:] / eta, Jl_k_th[1:], "r--", label="4/3 theoretical") ax1.set_title(f"$-J_{{KL}}(r)/r\\epsilon$, {title}", fontsize="x-large") ax1.set_xlabel("$r/\\eta$", fontsize="x-large") ax1.set_xscale("log") ax1.set_yscale("log") ax1.legend() plt.tight_layout() if save: plt.savefig("Jk_r_compensate.png", dpi=300) plt.show() # Plot div(J_KL) fig2, ax2 = self.output.figure_axe() ax2.set_ylabel(r"$-\nabla \cdot J_{KL}(r)/4\epsilon$", fontsize="x-large") ax2.plot( r_store[1:] / eta, -divJ_k[1:] / 4, "b", label="Numerical result" ) if "b" in keys_state_phys: ax2.plot( r_store[1:] / eta, -divJ_p[1:] / 4, "g", label="$\\nabla \\cdot J_P$", ) ax2.plot( r_store[1:] / eta, np.ones_like(r_store[1:]), "r--", label="1 theoretical", ) ax2.set_title( f"$-\\nabla \\cdot J_{{KL}}(r)/4\\epsilon$, {title}", fontsize="x-large", ) ax2.set_xlabel("$r/\\eta$", fontsize="x-large") ax2.set_xscale("log") ax2.set_yscale("log") ax2.legend() plt.tight_layout() if save: plt.savefig("divJk_r_comp.png", dpi=300) plt.show() # Plot S2_K fig3, ax3 = self.output.figure_axe() ax3.set_ylabel(r"$S_2^K(r)/(r^{2/3}\epsilon^{2/3})$", fontsize="x-large") ax3.plot(r_store[1:] / eta, S2_k_comp[1:], "b", label="Numerical result") if "b" in keys_state_phys: ax3.plot(r_store[1:] / eta, S2_p_comp[1:], "g", label="$S_2^P$") ax3.plot(r_store[1:] / eta, S2_k_th[1:], "r--", label="22/3 theoretical") ax3.plot(r_store[1:] / eta, EK_array[1:], "k--", label=r"$E_K$") ax3.set_title( f"$S_2^K(r)/(r^{{2/3}}\\epsilon^{{2/3}})$, {title}", fontsize="x-large", ) ax3.set_xlabel("$r/\\eta$", fontsize="x-large") ax3.set_xscale("log") ax3.set_yscale("log") ax3.legend() plt.tight_layout() if save: plt.savefig("S2k_r_comp.png", dpi=300) plt.show()
[docs] def plot_hv_dependencies(self, tmin=None, tmax=None, save=False): """Plot azimuthal (rho, z) dependencies of Kolmogorov law quantities.""" self._raise_parallel_error("plot_hv_dependencies") state = self.sim.state keys_state_phys = state.keys_state_phys params = self.sim.params keys = ["Jl_k_hv", "divJ_k_hv"] if "b" in keys_state_phys: keys.extend(["Jl_p_hv", "divJ_p_hv"]) to_plot, tmin, tmax = self.load_temp_average(keys, tmin, tmax) dimless_num = self.sim.output.spatial_means.get_dimless_numbers_averaged( tmin=tmin, tmax=tmax )["dimensional"] try: eta = dimless_num["eta"] except KeyError: warn("KeyError: 'eta' not available; eta is set to unity") eta = 1 with h5py.File(self.path_file, "r") as file: rh_store = np.array(file["rh_store"]) rv_store = np.array(file["rv_store"]) RH, RV = np.meshgrid(rh_store, rv_store) # Compute radius for normalization radius = np.sqrt(RH**2 + RV**2) Jk_l_comp = -to_plot["Jl_k_hv"] / (radius + 1e-14) divJk_hv = to_plot["divJ_k_hv"] if "b" in keys_state_phys: Jp_l_comp = -to_plot["Jl_p_hv"] / (radius + 1e-14) divJp_hv = to_plot["divJ_p_hv"] title = f"$n_x={params.oper.nx}$" # Plot J_KL(rho, z) fig1, ax1 = self.output.figure_axe() im = ax1.pcolormesh( RH[1:] / eta, RV[1:] / eta, -Jk_l_comp[1:], cmap="Blues", vmin=0.0, vmax=1.33, ) fig1.colorbar(im, ax=ax1) ax1.set_xlabel(r"$r_h/\eta$", fontsize="x-large") ax1.set_ylabel(r"$r_v/\eta$", fontsize="x-large") ax1.set_title( f"$-J_{{KL}}(r_h,r_v)/r\\epsilon$, {title}", fontsize="x-large" ) ax1.set_xscale("log") ax1.set_yscale("log") ax1.set_xlim(xmin=1) ax1.set_ylim(ymin=1) plt.tight_layout() if save: plt.savefig("Jk_l_hv.png", dpi=300) plt.show() # Plot div(J_KL)(rho, z) fig2, ax2 = self.output.figure_axe() im = ax2.pcolormesh( RH[1:] / eta, RV[1:] / eta, -divJk_hv[1:] / 4, cmap="Blues", vmin=0.0, vmax=1.0, ) fig2.colorbar(im, ax=ax2) ax2.set_xlabel(r"$r_h/\eta$", fontsize="x-large") ax2.set_ylabel(r"$r_v/\eta$", fontsize="x-large") ax2.set_title( f"$-\\nabla \\cdot J_{{KL}}(r_h,r_v)/4\\epsilon$, {title}", fontsize="x-large", ) ax2.set_xscale("log") ax2.set_yscale("log") ax2.set_xlim(xmin=1) ax2.set_ylim(ymin=1) plt.tight_layout() if save: plt.savefig("divJk_hv.png", dpi=300) plt.show() if "b" in keys_state_phys: # Plot J_PL(rho, z) fig3, ax3 = self.output.figure_axe() im = ax3.pcolormesh( RH[1:] / eta, RV[1:] / eta, -Jp_l_comp[1:], cmap="Greens", vmin=0.0, vmax=1.33, ) fig3.colorbar(im, ax=ax3) ax3.set_xlabel(r"$r_h/\eta$", fontsize="x-large") ax3.set_ylabel(r"$r_v/\eta$", fontsize="x-large") ax3.set_title( f"$-J_{{PL}}(r_h,r_v)/r\\epsilon$, {title}", fontsize="x-large", ) ax3.set_xscale("log") ax3.set_yscale("log") ax3.set_xlim(xmin=1) ax3.set_ylim(ymin=1) plt.tight_layout() if save: plt.savefig("Jp_l_hv.png", dpi=300) plt.show() # Plot div(J_PL)(rho, z) fig4, ax4 = self.output.figure_axe() im = ax4.pcolormesh( RH[1:] / eta, RV[1:] / eta, -divJp_hv[1:] / 4, cmap="Greens", vmin=0.0, vmax=1.0, ) fig4.colorbar(im, ax=ax4) ax4.set_xlabel(r"$r_h/\eta$", fontsize="x-large") ax4.set_ylabel(r"$r_v/\eta$", fontsize="x-large") ax4.set_title( f"$-\\nabla \\cdot J_{{PL}}(r_h,r_v)/4\\epsilon$, {title}", fontsize="x-large", ) ax4.set_xscale("log") ax4.set_yscale("log") ax4.set_xlim(xmin=1) ax4.set_ylim(ymin=1) plt.tight_layout() if save: plt.savefig("divJp_hv.png", dpi=300) plt.show()
[docs] def plot_Jhv_vector(self, tmin=None, tmax=None, save=False): """Plot vector field of J in (rho, z) plane.""" self._raise_parallel_error("plot_Jhv_vector") state = self.sim.state keys_state_phys = state.keys_state_phys params = self.sim.params keys = ["Jh_k_hv", "Jv_k_hv"] if "b" in keys_state_phys: keys.extend(["Jh_p_hv", "Jv_p_hv"]) to_plot, _, _ = self.load_temp_average(keys, tmin, tmax) Jk_v = to_plot["Jv_k_hv"] Jk_h = to_plot["Jh_k_hv"] with h5py.File(self.path_file, "r") as file: rh_store = np.array(file["rh_store"]) rv_store = np.array(file["rv_store"]) RH, RV = np.meshgrid(rh_store, rv_store) title = f"$n_x={params.oper.nx}$" fig1, ax1 = self.output.figure_axe() ax1.set_title(f"$-J_{{KL}}(r_h,r_v)$, {title}", fontsize="x-large") ax1.quiver(RH, RV, -Jk_v, -Jk_h, width=0.005) ax1.set_xlabel(r"$r_h$", fontsize="x-large") ax1.set_ylabel(r"$r_v$", fontsize="x-large") plt.tight_layout() if save: plt.savefig("Jk_vector_hv.png", dpi=300) plt.show() # Normalized version fig2, ax2 = self.output.figure_axe() ax2.set_title( f"Normalized $-J_{{KL}}(r_h,r_v)$, {title}", fontsize="x-large" ) RH_safe = np.where(RH != 0, RH, 1e-10) RV_safe = np.where(RV != 0, RV, 1e-10) ax2.quiver(RH, RV, -Jk_v / RV_safe, -Jk_h / RH_safe) ax2.set_xlabel(r"$r_h$", fontsize="x-large") ax2.set_ylabel(r"$r_v$", fontsize="x-large") plt.tight_layout() if save: plt.savefig("Jk_vector_hv_normalized.png", dpi=300) plt.show()