Source code for fluidsim.base.preprocess.pseudo_spect

"""Preprocessing for pseudo-spectral solvers (:mod:`fluidsim.base.preprocess.pseudo_spect`)
===========================================================================================

Provides:

.. autoclass:: PreprocessPseudoSpectral
   :members:
   :private-members:


"""

import numpy as np
from .base import PreprocessBase


[docs] class PreprocessPseudoSpectral(PreprocessBase): _tag = "pseudo_spectral" def __call__(self): """Preprocesses if enabled.""" super().__call__() if self.params.enable: if self.sim.params.forcing.enable: if "forcing" in self.params.init_field_scale: self.set_forcing_rate() self.normalize_init_fields() else: self.normalize_init_fields() self.set_forcing_rate() else: self.normalize_init_fields() self.sim.state.clear_computed() self.set_viscosity() self.output._save_info_solver_params_xml(replace=True)
[docs] def normalize_init_fields(self): """ A non-dimensionalization procedure for the initialized fields. Parameters ---------- init_field_scale : string (use 'energy', 'unity') Set quantity to normalize initialized fields with. """ state = self.sim.state scale = self.params.init_field_scale C = float(self.params.init_field_const) if scale == "energy": try: (Ek,) = self.output.compute_quad_energies() except: Ek = self.output.compute_energy() ux_fft = state.get_var("ux_fft") uy_fft = state.get_var("uy_fft") if Ek != 0.0: ux_fft = (C / Ek) ** 0.5 * ux_fft uy_fft = (C / Ek) ** 0.5 * uy_fft try: state.init_from_uxuyfft(ux_fft, uy_fft) except AttributeError: rot_fft = self.oper.rotfft_from_vecfft(ux_fft, uy_fft) state.init_statespect_from(rot_fft=rot_fft) elif scale == "enstrophy": omega_0 = self.output.compute_enstrophy() rot_fft = state.get_var("rot_fft") if omega_0 != 0.0: rot_fft = (C / omega_0) ** 0.5 * rot_fft state.init_from_rotfft(rot_fft) elif scale == "enstrophy_forcing": P = self.sim.params.forcing.forcing_rate k_f = self.oper.deltak * ( ( self.sim.params.forcing.nkmax_forcing + self.sim.params.forcing.nkmin_forcing ) // 2 ) omega_0 = self.output.compute_enstrophy() rot_fft = state.get_var("rot_fft") if omega_0 != 0.0: C_0 = omega_0 / (P ** (2.0 / 3) * k_f ** (4.0 / 3)) rot_fft = (C / C_0) ** 0.5 * rot_fft state.init_from_rotfft(rot_fft) elif scale == "unity": pass else: raise ValueError("Unknown initial fields scaling: ", scale)
[docs] def set_viscosity(self): r"""Based on - the initial total enstrophy, \Omega_0, or - the initial energy - the forcing rate, \epsilon the viscosity scale or Reynolds number is set. Parameters ---------- params.preprocess.viscosity_type : string Type/Order of viscosity desired params.preprocess.viscosity_scale : string Mean quantity to be scaled against params.preprocess.viscosity_const : float Calibration constant to set dissipative wave number Notes ----- Algorithm: Sets viscosity variable nu and reinitializes f_d array for timestepping """ params = self.params viscosity_type = params.viscosity_type viscosity_scale = params.viscosity_scale C = params.viscosity_const if viscosity_scale == "enstrophy": args = [self.output.compute_enstrophy()] elif viscosity_scale == "energy": args = [self.output.compute_energy()] elif viscosity_scale == "enstrophy_forcing": args = [ self.output.compute_enstrophy(), self.sim.params.forcing.forcing_rate, ] elif viscosity_scale == "forcing": args = [self.sim.params.forcing.forcing_rate] else: raise ValueError(f"Unknown viscosity scale: {viscosity_scale}") result = calcul_viscosity( C, viscosity_scale, viscosity_type, oper=self.oper, verbose=False, *args, ) for v in result.values(): attr, order, nu = v self.sim.params.__setattr__(attr, nu) self.sim.time_stepping.__init__(self.sim)
[docs] def set_forcing_rate(self): r"""Based on C, a non-dimensional ratio of forcing rate to one of the following forcing scales - the initial total energy, math:: E_0 - the initial total enstrophy, math:: \Omega_0 the forcing rate is set. Parameters ---------- params.preprocess.forcing_const : float Non-dimensional ratio of forcing_scale to forcing_rate params.preprocess.forcing_scale : string Mean quantity to be scaled against """ params = self.params forcing_scale = params.forcing_scale C = float(params.forcing_const) # Forcing wavenumber k_f = self.oper.deltak * ( ( self.sim.params.forcing.nkmax_forcing + self.sim.params.forcing.nkmin_forcing ) // 2 ) if forcing_scale == "unity": self.sim.params.forcing.forcing_rate = C elif forcing_scale == "energy": energy_0 = self.output.compute_energy() self.sim.params.forcing.forcing_rate = C * energy_0**1.5 * k_f elif forcing_scale == "enstrophy": omega_0 = self.output.compute_enstrophy() self.sim.params.forcing.forcing_rate = C * omega_0**1.5 / k_f**2 else: raise ValueError(f"Unknown forcing scale: {forcing_scale}") self.sim.forcing.__init__(self.sim)
def calcul_viscosity( viscosity_const, viscosity_scale, viscosity_type, *args, **kwargs ): """Calculates viscosity based on scaling formulae. Use this function to estimate viscosity before runtime. Parameters ---------- viscosity_const : scalar Calibration constant to set dissipative wave number viscosity_scale : string Mean quantity to be scaled against viscosity_type : string Type/Order of viscosity desired *args : scalar(s) Estimated value for viscosity scale oper : object of :class:`fluidsim.operators.operators.Operators`, optional nh, Lh, coef_dealiasing, nk_f : scalars, optional No. of grid points, length of the box, coeff. of dealiasing and forcing wavenumber index. verbose : bool, optional For verbose output Examples -------- >>> calcul_viscosity( ... 1, 'enstrophy', 'laplacian', 50., oper=sim.oper) >>> calcul_viscosity( ... 0.5, 'forcing', 'laplacian_hyper8', oper=sim.oper, verbose=False) >>> calcul_viscosity( ... 1, 'enstrophy_forcing', 'laplacian', 50., 1., ... nh=128, Lh=50, coef_dealiasing=2. / 3, nk_f=6) >>> calcul_viscosity( ... 0.785, 'forcing', 'hyper8', 1., ... nh=1920, Lh=50, coef_dealiasing=8. / 9, nk_f=6) """ if "verbose" in kwargs: verbose = kwargs["verbose"] else: verbose = True if "oper" in kwargs: oper = kwargs["oper"] coef_dealiasing = oper.coef_dealiasing nk_f = ( oper.params.forcing.nkmax_forcing + oper.params.forcing.nkmin_forcing ) // 2 delta_x = oper.deltax deltak = oper.deltak else: nh = kwargs["nh"] Lh = kwargs["Lh"] coef_dealiasing = kwargs["coef_dealiasing"] nk_f = kwargs["nk_f"] delta_x = Lh / nh deltak = 2 * np.pi / Lh # Smallest resolved scale k_max = np.pi / delta_x * coef_dealiasing # OR np.pi / k_d, the dissipative wave number C = viscosity_const length_scale = C * np.pi / k_max k_f = deltak * nk_f large_scale = np.pi / k_f k_diss = k_max / C / np.pi if verbose: print("Max. wavenumber =", np.pi / delta_x) print("Max. resolved wavenumber, k_max =", k_max) print("Grid spacing, delta_x =", delta_x) print("\nESTIMATED (P~eps)") print( f"Dissipation wavenumber, k_d = {k_diss}; k_d / k_f = {k_diss / k_f}" ) print( f"Dissipation length scale, L_d = {length_scale}; L_d / L_f = {length_scale / large_scale}" ) print("Viscosity scale:", viscosity_scale, "=", args) if len(args) == 0: raise ValueError("Expected values related to `viscosity_scale` as *args") if viscosity_scale == "enstrophy": omega_0 = args[0] eta = omega_0**1.5 time_scale = eta ** (-1.0 / 3) elif viscosity_scale == "energy": energy_0 = args[0] epsilon = energy_0 * (1.5) / large_scale time_scale = epsilon ** (-1.0 / 3) * length_scale ** (2.0 / 3) elif viscosity_scale == "enstrophy_forcing": omega_0 = args[0] eta = omega_0**1.5 t1 = eta ** (-1.0 / 3) # Energy dissipation rate epsilon = args[1] t2 = epsilon ** (-1.0 / 3) * length_scale ** (2.0 / 3) time_scale = min(t1, t2) elif viscosity_scale == "forcing": epsilon = args[0] time_scale = epsilon ** (-1.0 / 3) * length_scale ** (2.0 / 3) else: raise ValueError(f"Unknown viscosity scale: {viscosity_scale}") dict_visc = { "laplacian": ["nu_2", 2], "hyper4": ["nu_4", 4], "hyper8": ["nu_8", 8], "hypo": ["nu_m4", -4], } if verbose: if "oper" in kwargs: epsilon = oper.params.forcing.forcing_rate elif "eps" in kwargs: epsilon = kwargs["eps"] else: epsilon = 1.0 print("Dissipation, epsilon =", epsilon) kolmo_len = [] if not any([k in viscosity_type for k in dict_visc]): raise ValueError(f"Unknown viscosity type: {viscosity_type}") else: for k, v in dict_visc.items(): if k in viscosity_type: attr, order = v nu = length_scale**order / time_scale v.append(nu) if verbose: kolmo_len.append((nu**3 / epsilon) ** (1.0 / (3 * order - 2))) else: v.append(0.0) if verbose: length_scale = np.mean(kolmo_len) k_diss = 1.0 / length_scale print(f"\nCALCULATED (eps={epsilon})") print( f"Dissipation wavenumber, k_d = {k_diss}; k_d / k_f = {k_diss / k_f}" ) print( f"Dissipation length scale, L_d = {length_scale}; L_d / L_f = {length_scale / large_scale}" ) return dict_visc