"""Normal mode decomposition for SW1L solvers
(:mod:`fluidsim.solvers.sw1l.output.normal_mode`)
=================================================
Provides:
.. autoclass:: NormalModeBase
:members:
:private-members:
.. autoclass:: NormalModeDecomposition
:members:
:private-members:
.. autoclass:: NormalModeDecompositionModified
:members:
:private-members:
"""
import numpy as np
from fluiddyn.util.compat import cached_property
from fluiddyn.util import mpi
[docs]
class NormalModeBase:
def __init__(self, output):
self.sim = output.sim
self.params = output.sim.params
self.oper = output.oper
f = self.params.f
c = self.params.c2**0.5
ck = c * self.oper.K_not0
if f == 0:
self.sigma = ck
else:
self.sigma = np.sqrt(f**2 + ck**2)
self.bvec_fft = None
self.it_bvec_fft_computed = -1
def compute(self):
if self.it_bvec_fft_computed != self.sim.time_stepping.it:
get_var = self.sim.state.get_var
if {"ap_fft", "am_fft"}.issubset(self.sim.state.keys_state_spect):
q_fft = get_var("q_fft")
ap_fft = get_var("ap_fft")
am_fft = get_var("am_fft")
bvec_fft = self.bvecfft_from_qapamfft(q_fft, ap_fft, am_fft)
else:
ux_fft = get_var("ux_fft")
uy_fft = get_var("uy_fft")
eta_fft = get_var("eta_fft")
bvec_fft = self.bvecfft_from_uxuyetafft(ux_fft, uy_fft, eta_fft)
self.bvec_fft = bvec_fft
self.it_bvec_fft_computed = self.sim.time_stepping.it
return self.bvec_fft
[docs]
def bvecfft_from_qapamfft(self, q_fft, ap_fft, am_fft):
r"""Compute normal mode vector :math:`\mathbf{B}`
with dimensions of velocity from diagonalized linear modes."""
c = self.params.c2**0.5
c2 = self.params.c2
K = self.oper.K_not0
sigma = self.sigma
q_fft = -q_fft * c / sigma
ap_fft = ap_fft * 2**0.5 * c2 / (sigma * K)
am_fft = am_fft * 2**0.5 * c2 / (sigma * K)
bvec_fft = np.array([q_fft, ap_fft, am_fft])
if mpi.rank == 0 or self.oper.is_sequential:
bvec_fft[:, 0, 0] = 0.0
return bvec_fft
[docs]
def bvecfft_from_uxuyetafft(self, ux_fft, uy_fft, eta_fft):
r"""Compute normal mode vector, :math:`\mathbf{B}`
with dimensions of velocity from primitive variables."""
q_fft, ap_fft, am_fft = self.oper.qapamfft_from_uxuyetafft(
ux_fft, uy_fft, eta_fft
)
return self.bvecfft_from_qapamfft(q_fft, ap_fft, am_fft)
[docs]
def compute_qda_energies_fft(self):
"""Compute quadratic geostrophic, divergent and ageostrophic energies."""
bvec_fft = self.compute()
q_fft = bvec_fft[0]
d_fft = 0.5 * (bvec_fft[1] - bvec_fft[2])
a_fft = 0.5 * (bvec_fft[1] + bvec_fft[2])
def energy(var_fft):
return 0.5 * np.abs(var_fft) ** 2
return energy(q_fft), energy(d_fft), energy(a_fft)
[docs]
def compute_qapam_energies_fft(self):
"""Compute quadratic geostrophic, and ageostrophic (+/-) energies."""
bvec_fft = self.compute()
def energy(var_fft):
return 0.5 * np.abs(var_fft) ** 2
return map(energy, bvec_fft)
[docs]
class NormalModeDecomposition(NormalModeBase):
def __init__(self, output):
super().__init__(output)
@cached_property
def qmat(self):
"""Compute Q matrix to transform q, ap, am (fft) -> b0, b+, b- (fft) with
dimensions of velocity.
"""
oper = self.oper
sigma = self.sigma
f = float(self.params.f)
c = self.params.c2**0.5
KX = oper.KX
KY = oper.KY
K = oper.K
K2 = oper.K2
K_not0 = oper.K_not0
ck = c * K_not0
qmat = np.array(
[
[
-1j * 2.0**0.5 * ck * KY,
+1j * f * KY + KX * sigma,
+1j * f * KY - KX * sigma,
],
[
+1j * 2.0**0.5 * ck * KX,
-1j * f * KX + KY * sigma,
-1j * f * KX - KY * sigma,
],
[2.0**0.5 * f * K, c * K2, c * K2],
]
) / (2.0**0.5 * sigma * K_not0)
if mpi.rank == 0 or oper.is_sequential:
qmat[:, :, 0, 0] = 0.0
return qmat
[docs]
def normalmodefft_from_keyfft(self, key):
"""Returns the normal mode decomposition for the state_spect key specified."""
if key == "div_fft":
key_modes, normal_mode_vec_fft_x = self.normalmodefft_from_keyfft(
"px_ux_fft"
)
key_modes, normal_mode_vec_fft_y = self.normalmodefft_from_keyfft(
"py_uy_fft"
)
normal_mode_vec_fft = normal_mode_vec_fft_x + normal_mode_vec_fft_y
else:
key_modes = np.array([["G", "A", "a"]])
row_index = {
"ux_fft": 0,
"uy_fft": 1,
"eta_fft": 2,
"px_ux_fft": 0,
"px_uy_fft": 1,
"px_eta_fft": 2,
"py_ux_fft": 0,
"py_uy_fft": 1,
"py_eta_fft": 2,
}
r = row_index[key]
normal_mode_vec_fft = np.einsum(
"i...,i...->i...", self.qmat[r], self.bvec_fft
)
if "px" in key:
for r in range(3):
normal_mode_vec_fft[r] = self.oper.pxffft_from_fft(
normal_mode_vec_fft[r]
)
elif "py" in key:
for r in range(3):
normal_mode_vec_fft[r] = self.oper.pyffft_from_fft(
normal_mode_vec_fft[r]
)
if "eta" in key:
normal_mode_vec_fft = normal_mode_vec_fft / self.params.c2**0.5
return key_modes, normal_mode_vec_fft
def normalmodephys_from_keyphys(self, key):
ifft2 = self.oper.ifft2
key_modes, normal_mode_vec_fft = self.normalmodefft_from_keyfft(
key + "_fft"
)
normal_mode_vec_phys = np.array(
[ifft2(normal_mode_vec_fft[i]) for i in range(3)]
)
return key_modes, normal_mode_vec_phys
def _group_matrix_using_dict(self, key_matrix, value_matrix, grouping):
value_dict = dict.fromkeys(grouping, 0.0)
n1, n2 = key_matrix.shape
for i in range(n1):
for j in range(n2):
k1 = key_matrix[i, j]
k3 = None
for k2 in grouping.keys():
if k1 in grouping[k2]:
k3 = k2
break
if k3 is None:
raise KeyError(
"Not sure which dyad group " + k1 + " belongs to"
)
value_dict[k3] += value_matrix[i, j]
new_matrix = np.array([value_dict[k] for k in value_dict.keys()])
new_keys = np.array([list(value_dict)])
return new_keys, new_matrix
def dyad_from_keyfft(self, conjugate=False, *keys_state_spect):
dyad_group = {
"GG": ["GG"],
"AG": ["GA", "AG"],
"aG": ["Ga", "aG"],
"AA": ["AA", "Aa", "aA", "aa"],
}
k1, k2 = keys_state_spect
normal_modes = dict()
if k1 != k2:
for k in keys_state_spect:
key_modes, normal_modes[k] = self.normalmodefft_from_keyfft(k)
else:
key_modes, normal_modes[k1] = self.normalmodefft_from_keyfft(k1)
normal_modes[k2] = normal_modes[k1]
key_modes_mat = np.char.add(key_modes.transpose(), key_modes)
if conjugate:
Ni = normal_modes[k1].conj()
Nj = normal_modes[k2]
else:
Ni = normal_modes[k1]
Nj = normal_modes[k2]
dyad_mat_fft = np.einsum("i...,j...->ij...", Ni, Nj)
del (normal_modes, Ni, Nj)
return self._group_matrix_using_dict(
key_modes_mat, dyad_mat_fft, dyad_group
)
def dyad_from_keyphys(self, *keys_state_phys):
dyad_group = {
"GG": ["GG"],
"AG": ["GA", "AG"],
"aG": ["Ga", "aG"],
"AA": ["AA", "Aa", "aA", "aa"],
}
k1, k2 = keys_state_phys
normal_modes = dict()
if k1 != k2:
for k in keys_state_phys:
key_modes, normal_modes[k] = self.normalmodephys_from_keyphys(k)
else:
key_modes, normal_modes[k1] = self.normalmodephys_from_keyphys(k1)
normal_modes[k2] = normal_modes[k1]
key_modes_mat = np.char.add(key_modes.transpose(), key_modes)
dyad_mat_phys = np.einsum(
"i...,j...->ij...", normal_modes[k1], normal_modes[k2]
)
del normal_modes
fft2 = self.oper.fft2
dyad_mat_fft = np.array(
[[fft2(dyad_mat_phys[i, j]) for j in range(3)] for i in range(3)]
)
for i in range(3):
for j in range(3):
self.oper.dealiasing(dyad_mat_fft[i, j])
del dyad_mat_phys
return self._group_matrix_using_dict(
key_modes_mat, dyad_mat_fft, dyad_group
)
def triad_from_keyfft(self, *keys_state_spect):
triad_group = {
"GGG": ["GGG"],
"AGG": ["AGG", "GAG", "GGA", "aGG", "GaG", "GGa"],
"GAAs": ["aaG", "aGa", "Gaa", "AAG", "AGA", "GAA"],
"GAAd": ["aAG", "AaG", "aGA", "AGa", "GaA", "GAa"],
"AAA": ["AAA", "aaa", "AAa", "AaA", "aAA", "aaA", "aAa", "Aaa"],
}
k1, k2, k3 = keys_state_spect
key_modes_1, normal_modes_1 = self.normalmodefft_from_keyfft(k1)
key_modes_23, normal_modes_23 = self.dyad_from_keyfft(False, k2, k3)
key_modes_mat = np.char.add(key_modes_1.transpose(), key_modes_23)
triad_mat = np.einsum(
"i...,j...->ij...", normal_modes_1.conj(), normal_modes_23
)
return self._group_matrix_using_dict(
key_modes_mat, triad_mat, triad_group
)
def triad_from_keyfftphys(self, key_state_spect, *keys_state_phys):
triad_group = {
"GGG": ["GGG"],
"AGG": ["AGG", "GAG", "GGA", "aGG", "GaG", "GGa"],
"GAAs": ["aaG", "aGa", "Gaa", "AAG", "AGA", "GAA"],
"GAAd": ["aAG", "AaG", "aGA", "AGa", "GaA", "GAa"],
"AAA": ["AAA", "aaa", "AAa", "AaA", "aAA", "aaA", "aAa", "Aaa"],
}
k1 = key_state_spect
k2, k3 = keys_state_phys
key_modes_1, normal_modes_1 = self.normalmodefft_from_keyfft(k1)
key_modes_23, normal_modes_23 = self.dyad_from_keyphys(k2, k3)
key_modes_mat = np.char.add(key_modes_1.transpose(), key_modes_23)
triad_mat = np.einsum(
"i...,j...->ij...", normal_modes_1.conj(), normal_modes_23
)
return self._group_matrix_using_dict(
key_modes_mat, triad_mat, triad_group
)
[docs]
class NormalModeDecompositionModified(NormalModeDecomposition):
def compute(self):
if self.it_bvec_fft_computed != self.sim.time_stepping.it:
get_var = self.sim.state.get_var
#
# FIXME:Does not work
#
# if {'ap_fft', 'am_fft'}.issubset(self.sim.state.keys_state_spect):
# q_fft = get_var('q_fft')
# ap_fft = get_var('ap_fft')
# am_fft = get_var('am_fft')
# a_fft = ap_fft + am_fft
# bvecrot_fft = self.bvecfft_from_qapamfft(q_fft, a_fft, a_fft)
# else:
rot_fft = self.sim.state.get_var("rot_fft")
uxr_fft, uyr_fft = self.oper.vecfft_from_rotfft(rot_fft)
eta_fft = get_var("eta_fft")
bvecrot_fft = self.bvecfft_from_uxuyetafft(uxr_fft, uyr_fft, eta_fft)
self.bvecrot_fft = bvecrot_fft
return super().compute()
[docs]
def normalmodefft_from_keyfft(self, key):
"""Returns the normal mode decomposition for the state_spect key specified."""
if key.endswith("urx_fft") or key.endswith("ury_fft"):
row_index = {
"urx_fft": 0,
"ury_fft": 1,
"px_urx_fft": 0,
"px_ury_fft": 1,
"py_urx_fft": 0,
"py_ury_fft": 1,
}
key_modes = np.array([["G", "A", "a"]])
r = row_index[key]
normal_mode_vec_fft = np.einsum(
"i...,i...->i...", self.qmat[r], self.bvecrot_fft
)
if "px" in key:
for r in range(3):
normal_mode_vec_fft[r] = self.oper.pxffft_from_fft(
normal_mode_vec_fft[r]
)
elif "py" in key:
for r in range(3):
normal_mode_vec_fft[r] = self.oper.pyffft_from_fft(
normal_mode_vec_fft[r]
)
return key_modes, normal_mode_vec_fft
else:
return super().normalmodefft_from_keyfft(key)