"""Operators 2d (:mod:`fluidsim.operators.operators2d`)
=======================================================
Provides
.. autoclass:: OperatorsPseudoSpectral2D
:members:
:private-members:
"""
from warnings import warn
from random import uniform
import sys
import numpy as np
from transonic import boost, Array, Transonic
from fluiddyn.util import mpi
from fluidfft.fft2d.operators import OperatorsPseudoSpectral2D as _Operators
from fluidsim.base.params import Parameters
from ..base.setofvariables import SetOfVariables
from .. import _is_testing
from .base import OperatorBase
ts = Transonic()
if (
not ts.is_transpiling
and not ts.is_compiled
and not _is_testing
and "sphinx" not in sys.modules
):
warn(
"operators2d.py has to be pythranized to be efficient! "
"Install pythran and recompile."
)
elif ts.is_transpiling:
_Operators = object
Af = Array[np.float64, "2d"]
Ac = Array[np.complex128, "2d"]
@boost
def laplacian_fft(a_fft: Ac, Kn: Af):
"""Compute the n-th order Laplacian."""
return a_fft * Kn
@boost
def invlaplacian_fft(a_fft: Ac, Kn_not0: Af, rank: int):
"""Compute the n-th order inverse Laplacian."""
invlap_afft = a_fft / Kn_not0
if rank == 0:
invlap_afft[0, 0] = 0.0
return invlap_afft
@boost
def compute_increments_dim1(var: Af, irx: int):
"""Compute the increments of var over the dim 1."""
n1 = var.shape[1]
n1new = n1 - irx
# bug for gast 0.4.0 (https://github.com/serge-sans-paille/gast/issues/48)
inc_var = var[:, irx:n1] - var[:, 0:n1new]
return inc_var
if not ts.is_transpiling:
nb_proc = mpi.nb_proc
rank = mpi.rank
else:
nb_proc = 1
rank = 0
if nb_proc > 1:
MPI = mpi.MPI
comm = mpi.comm
[docs]
@boost
class OperatorsPseudoSpectral2D(_Operators, OperatorBase):
_has_to_dealiase: bool
where_dealiased: "uint8[:, :]"
KX: Af
KY: Af
deltax: float
deltay: float
@classmethod
def _create_default_params(cls):
params = Parameters(tag="params", attribs={"ONLY_COARSE_OPER": False})
cls._complete_params_with_default(params)
return params
[docs]
@staticmethod
def _complete_params_with_default(params):
"""This static method is used to complete the *params* container."""
attribs = {
"type_fft": "default",
"coef_dealiasing": 2.0 / 3,
"nx": 48,
"ny": 48,
"Lx": 8,
"Ly": 8,
"truncation_shape": "cubic",
"NO_SHEAR_MODES": False,
"NO_KY0": False,
}
params._set_child("oper", attribs=attribs)
def __init__(self, params):
self.params = params
self.axes = ("y", "x")
nx = int(params.oper.nx)
ny = int(params.oper.ny)
if params.oper.nx != nx:
raise ValueError(
f"params.oper.nx != int(params.oper.nx); ({params.oper.nx})"
)
if params.oper.ny != ny:
raise ValueError(
f"params.oper.ny != int(params.oper.ny); ({params.oper.ny})"
)
params.oper.nx = nx
params.oper.ny = ny
if params.ONLY_COARSE_OPER:
nx = ny = 4
super().__init__(
nx,
ny,
params.oper.Lx,
params.oper.Ly,
fft=params.oper.type_fft,
coef_dealiasing=params.oper.coef_dealiasing,
)
# compatibility for fluidfft <= 0.3.0
if not hasattr(self, "oper_fft") and hasattr(self, "opfft"):
self.oper_fft = self.opfft
self.Lx = self.lx
self.Ly = self.ly
try:
self.project_fft_on_realX = self._opfft.project_fft_on_realX
except AttributeError:
if self.is_sequential:
self.project_fft_on_realX = self.project_fft_on_realX_seq
else:
self.project_fft_on_realX = self.project_fft_on_realX_slow
if not self.is_sequential:
self.iKxloc_start, _ = self.oper_fft.get_seq_indices_first_K()
self.iKxloc_start_rank = np.array(comm.allgather(self.iKxloc_start))
nkx_loc_rank = np.array(comm.allgather(self.nkx_loc))
a = nkx_loc_rank
self.SAME_SIZE_IN_ALL_PROC = (a >= a.max()).all()
self._reinit_truncation()
try:
NO_SHEAR_MODES = self.params.oper.NO_SHEAR_MODES
except AttributeError:
pass
else:
if NO_SHEAR_MODES:
COND_NOSHEAR = abs(self.KX) == 0.0
self.where_dealiased = np.array(
np.logical_or(COND_NOSHEAR, self.where_dealiased),
dtype=np.uint8,
)
try:
NO_KY0 = self.params.oper.NO_KY0
except AttributeError:
pass
else:
if NO_KY0:
COND_NO_KY0 = abs(self.KY) == 0.0
self.where_dealiased = np.array(
np.logical_or(COND_NO_KY0, self.where_dealiased),
dtype=np.uint8,
)
def get_region_multiple_aliases(self):
aliases_x = abs(self.KX) >= 2 / 3 * self.deltakx * self.nx / 2
aliases_y = abs(self.KY) >= 2 / 3 * self.deltaky * self.ny / 2
return aliases_x & aliases_y
def dealiasing(self, *args):
if not self._has_to_dealiase:
return
for thing in args:
if isinstance(thing, SetOfVariables):
self.dealiasing_setofvar(thing)
elif isinstance(thing, np.ndarray):
self.dealiasing_variable(thing)
[docs]
@boost
def dealiasing_setofvar(self, sov: "complex128[][][]"):
"""Dealiasing of a setofvar arrays."""
if self._has_to_dealiase:
nk, n0, n1 = sov.shape
for i0 in range(n0):
for i1 in range(n1):
if self.where_dealiased[i0, i1]:
for ik in range(nk):
sov[ik, i0, i1] = 0.0
[docs]
def project_fft_on_realX_seq(self, f_fft):
"""Project the given field in spectral space such as its
inverse fft is a real field."""
nky_seq = self.shapeK_seq[0]
iky_ky0 = 0
iky_kyM = nky_seq // 2
ikx_kx0 = 0
# ikx_kxM = self.nkx_seq-1
ikx_kxM = self.shapeK_seq[1] - 1
# first, some values have to be real
f_fft[iky_ky0, ikx_kx0] = f_fft[iky_ky0, ikx_kx0].real
f_fft[iky_ky0, ikx_kxM] = f_fft[iky_ky0, ikx_kxM].real
f_fft[iky_kyM, ikx_kx0] = f_fft[iky_kyM, ikx_kx0].real
f_fft[iky_kyM, ikx_kxM] = f_fft[iky_kyM, ikx_kxM].real
# second, there are relations between some values
for ikyp in range(1, iky_kyM):
ikyn = nky_seq - ikyp
f_kp_kx0 = f_fft[ikyp, ikx_kx0]
f_kn_kx0 = f_fft[ikyn, ikx_kx0]
f_fft[ikyp, ikx_kx0] = (f_kp_kx0 + f_kn_kx0.conjugate()) / 2
f_fft[ikyn, ikx_kx0] = (
(f_kp_kx0 + f_kn_kx0.conjugate()) / 2
).conjugate()
f_kp_kxM = f_fft[ikyp, ikx_kxM]
f_kn_kxM = f_fft[ikyn, ikx_kxM]
f_fft[ikyp, ikx_kxM] = (f_kp_kxM + f_kn_kxM.conjugate()) / 2
f_fft[ikyn, ikx_kxM] = (
(f_kp_kxM + f_kn_kxM.conjugate()) / 2
).conjugate()
return f_fft
def project_fft_on_realX_slow(self, f_fft):
return self.fft(self.ifft(f_fft))
[docs]
def coarse_seq_from_fft_loc(self, f_fft, shapeK_loc_coarse):
"""Return a coarse field in K space."""
nkyc = shapeK_loc_coarse[0]
nkxc = shapeK_loc_coarse[1]
if nb_proc == 1:
nky = self.shapeK_seq[0]
fc_fft = np.empty([nkyc, nkxc], np.complex128)
for ikyc in range(nkyc):
if ikyc <= nkyc / 2:
iky = ikyc
else:
kynodim = ikyc - nkyc
iky = kynodim + nky
for ikxc in range(nkxc):
fc_fft[ikyc, ikxc] = f_fft[iky, ikxc]
return fc_fft
if not self.is_transposed:
raise NotImplementedError()
fc_trans = np.empty([nkxc, nkyc], np.complex128)
nky = self.shapeK_seq[1]
f1d_temp = np.empty([nkyc], np.complex128)
for ikxc in range(nkxc):
kx = self.deltakx * ikxc
rank_ikx, ikxloc, ikyloc = self.where_is_wavenumber(kx, 0.0)
if rank == rank_ikx:
# create f1d_temp
for ikyc in range(nkyc):
if ikyc <= nkyc / 2:
iky = ikyc
else:
kynodim = ikyc - nkyc
iky = kynodim + nky
f1d_temp[ikyc] = f_fft[ikxloc, iky]
if rank_ikx != 0:
# message f1d_temp
if rank == 0:
# print('f1d_temp', f1d_temp, f1d_temp.dtype)
comm.Recv(
[f1d_temp, MPI.DOUBLE_COMPLEX],
source=rank_ikx,
tag=ikxc,
)
elif rank == rank_ikx:
comm.Send([f1d_temp, MPI.DOUBLE_COMPLEX], dest=0, tag=ikxc)
if rank == 0:
# copy into fc_trans
fc_trans[ikxc] = f1d_temp.copy()
fc_fft = fc_trans.transpose()
return fc_fft
# def fft_loc_from_coarse_seq(self, fc_fft, shapeK_loc_coarse):
# """Return a large field in K space."""
# nkyc = shapeK_loc_coarse[0]
# nkxc = shapeK_loc_coarse[1]
# if nb_proc > 1:
# nky = self.shapeK_seq[0]
# f_fft = self.create_arrayK(value=0.)
# fc_trans = fc_fft.transpose()
# for ikxc in range(nkxc):
# kx = self.deltakx*ikxc
# rank_ikx, ikxloc, ikyloc = self.where_is_wavenumber(kx, 0.)
# fc1D = fc_trans[ikxc]
# if rank_ikx != 0:
# # message fc1D
# fc1D = np.ascontiguousarray(fc1D)
# if rank == 0:
# comm.Send(fc1D, dest=rank_ikx, tag=ikxc)
# elif rank == rank_ikx:
# comm.Recv(fc1D, source=0, tag=ikxc)
# if rank == rank_ikx:
# # copy
# for ikyc in range(nkyc):
# if ikyc <= nkyc/2:
# iky = ikyc
# else:
# kynodim = ikyc - nkyc
# iky = kynodim + nky
# f_fft[ikxloc, iky] = fc1D[ikyc]
# else:
# nky = self.shapeK_seq[0]
# nkx = self.shapeK_seq[1]
# f_fft = np.zeros([nky, nkx], np.complex128)
# for ikyc in range(nkyc):
# if ikyc <= nkyc/2:
# iky = ikyc
# else:
# kynodim = ikyc - nkyc
# iky = kynodim + nky
# for ikxc in range(nkxc):
# f_fft[iky, ikxc] = fc_fft[ikyc, ikxc]
# return f_fft
[docs]
def compute_increments_dim1(self, var, irx):
"""Compute the increments of var over the dim 1."""
return compute_increments_dim1(var, int(irx))
[docs]
def pdf_normalized(self, field, nb_bins=100):
"""Compute the normalized pdf"""
field_max = field.max()
field_min = field.min()
if nb_proc > 1:
field_max = comm.allreduce(field_max, op=MPI.MAX)
field_min = comm.allreduce(field_min, op=MPI.MIN)
range_min = field_min
range_max = field_max
if nb_proc == 1:
pdf, bin_edges = np.histogram(
field, bins=nb_bins, density=True, range=(range_min, range_max)
)
else:
hist, bin_edges = np.histogram(
field, bins=nb_bins, range=(range_min, range_max)
)
# memory leak related to this line for CPython 3.7.1
# hist = comm.allreduce(hist, op=MPI.SUM)
# workaround for CPython 3.7.0 and 3.7.1
tmp = np.empty_like(hist)
comm.Allreduce(hist, tmp, op=MPI.SUM)
hist = tmp
#
pdf = hist / ((bin_edges[1] - bin_edges[0]) * hist.sum())
return pdf, bin_edges
def where_is_wavenumber(self, kx_approx, ky_approx):
ikx_seq = int(np.round(kx_approx / self.deltakx))
if ikx_seq >= self.nkx_seq:
raise ValueError("not good :-) ikx_seq >= self.nkx_seq")
if self.is_sequential:
rank_k = 0
ikx_loc = ikx_seq
else:
if self.SAME_SIZE_IN_ALL_PROC:
rank_k = int(np.floor(float(ikx_seq) / self.nkx_loc))
if ikx_seq >= self.nkx_loc * mpi.nb_proc:
raise ValueError(
"not good :-) ikx_seq >= self.nkx_loc * mpi.nb_proc\n"
"ikx_seq, self.nkx_loc, mpi.nb_proc = "
"{}, {}, {}".format(ikx_seq, self.nkx_loc, mpi.nb_proc)
)
else:
rank_k = 0
while rank_k < self.nb_proc - 1 and (
not (
self.iKxloc_start_rank[rank_k] <= ikx_seq
and ikx_seq < self.iKxloc_start_rank[rank_k + 1]
)
):
rank_k += 1
ikx_loc = ikx_seq - self.iKxloc_start_rank[rank_k]
iky_loc = int(np.round(ky_approx / self.deltaky))
if iky_loc < 0:
iky_loc = self.nky_loc + iky_loc
if self.is_transposed:
ik0_loc = ikx_loc
ik1_loc = iky_loc
else:
ik0_loc = iky_loc
ik1_loc = ikx_loc
return rank_k, ik0_loc, ik1_loc
def uxuyfft_from_psifft(self, psi_fft):
px_psi_fft, py_psi_fft = self.gradfft_from_fft(psi_fft)
ux_fft = -py_psi_fft
uy_fft = px_psi_fft
return ux_fft, uy_fft
def rotfft_from_psifft(self, psi_fft):
return self.laplacian_fft(psi_fft)
[docs]
def pxffft_from_fft(self, f_fft):
"""Return the gradient of f_fft in spectral space."""
return 1j * self.KX * f_fft
[docs]
def pyffft_from_fft(self, f_fft):
"""Return the gradient of f_fft in spectral space."""
return 1j * self.KY * f_fft
def laplacian2_fft(self, a_fft):
warn("Use oper.laplacian_fft instead.", PendingDeprecationWarning)
return laplacian_fft(a_fft, self.K4)
def invlaplacian2_fft(self, a_fft):
warn("Use oper.invlaplacian_fft instead.", PendingDeprecationWarning)
return invlaplacian_fft(a_fft, self.K4_not0, rank)
[docs]
def laplacian_fft(self, a_fft, order=2, negative=False):
r"""Compute the n-th order Laplacian, :math:`\nabla^{n} \hat{a}`
Parameters
----------
a_fft : ndarray
order: int, {2, 4, 8}, optional
Order of the Laplacian operator
negative: bool, optional
Negative of the result.
"""
sign = 1j**order
if sign.imag != 0:
raise ValueError(f"Order={order} should be even!")
if negative:
sign *= -1
Kn = getattr(self, f"K{int(order)}")
# Avoid unnecessary multiplication by unity
if sign == 1:
return laplacian_fft(a_fft, Kn)
else:
return sign * laplacian_fft(a_fft, Kn)
[docs]
def invlaplacian_fft(self, a_fft, order=2, negative=False):
r"""Compute the n-th order inverse Laplacian, :math:`\nabla^{-n} \hat{a}`
Parameters
----------
a_fft : ndarray
order: int, {2, 4, 8}, optional
Order of the inverse Laplacian operator.
negative: bool, optional
Negative of the result.
"""
sign = 1.0 / 1j**order
if sign.imag != 0:
raise ValueError(f"Order={order} should be even!")
if negative:
sign *= -1
Kn_not0 = getattr(self, f"K{int(order)}_not0")
# Avoid unnecessary multiplication by unity
if sign == 1:
return invlaplacian_fft(a_fft, Kn_not0, rank)
else:
return sign * invlaplacian_fft(a_fft, Kn_not0, rank)
[docs]
def put_coarse_array_in_array_fft(
self, arr_coarse, arr, oper_coarse, shapeK_loc_coarse
):
"""Put the values contained in a coarse array in an array.
Both arrays are in Fourier space.
"""
if arr.ndim == 3:
if mpi.rank == 0:
if arr_coarse.ndim != 3:
raise ValueError
for ikey in range(arr.shape[0]):
if mpi.rank == 0:
arr2d_coarse = arr_coarse[ikey]
else:
arr2d_coarse = None
self.put_coarse_array_in_array_fft(
arr2d_coarse, arr[ikey], oper_coarse, shapeK_loc_coarse
)
return
nKyc, nKxc = shapeK_loc_coarse
if mpi.nb_proc > 1 and not self.is_sequential:
if not self.is_transposed:
raise NotImplementedError()
nKy = self.shapeK_seq[1]
if mpi.rank == 0:
fck_fft = arr_coarse.transpose()
for ikxc in range(nKxc):
kx = self.deltakx * ikxc
rank_ikx, ikxloc, ikyloc = self.where_is_wavenumber(kx, 0.0)
if mpi.rank == 0:
fc1D = fck_fft[ikxc]
if rank_ikx != 0:
# message fc1D
if mpi.rank == rank_ikx:
fc1D = np.empty([nKyc], dtype=np.complex128)
if mpi.rank == 0 or mpi.rank == rank_ikx:
fc1D = np.ascontiguousarray(fc1D)
if mpi.rank == 0:
mpi.comm.Send(
[fc1D, mpi.MPI.COMPLEX], dest=rank_ikx, tag=ikxc
)
elif mpi.rank == rank_ikx:
mpi.comm.Recv([fc1D, mpi.MPI.COMPLEX], source=0, tag=ikxc)
if mpi.rank == rank_ikx:
# copy
for ikyc in range(nKyc):
if ikyc <= nKyc / 2.0:
iky = ikyc
else:
kynodim = ikyc - nKyc
iky = kynodim + nKy
arr[ikxloc, iky] = fc1D[ikyc]
else:
nKy = self.shapeK_seq[0]
if not np.allclose(0.0, max(abs(arr_coarse[nKyc // 2, :]))):
raise ValueError("any(arr_coarse[nKyc//2] != 0)")
if not np.allclose(0.0, max(abs(arr_coarse[:, nKxc - 1]))):
raise ValueError("any(arr_coarse[:, nKxc-1] != 0)")
for ikyc in range(nKyc):
if ikyc <= nKyc / 2.0:
iky = ikyc
else:
kynodim = ikyc - nKyc
iky = kynodim + nKy
for ikxc in range(nKxc):
arr[iky, ikxc] = arr_coarse[ikyc, ikxc]
def get_grid1d_seq(self, axis="x"):
if axis not in ("x", "y"):
raise ValueError
if self.params.ONLY_COARSE_OPER:
number_points = getattr(self.params.oper, "n" + axis)
length = getattr(self, "L" + axis)
return np.linspace(0, length, number_points)
else:
return getattr(self, axis + "_seq")
@boost
def get_phases_random(self):
# Not supported by Pythran 0.9.5!
# alpha_x, alpha_y = np.random.uniform(-0.5, 0.5, 2)
alpha_x, alpha_y = tuple(uniform(-0.5, 0.5) for _ in range(2))
beta_x = alpha_x + 0.5 if alpha_x < 0 else alpha_x - 0.5
beta_y = alpha_y + 0.5 if alpha_y < 0 else alpha_y - 0.5
phase_alpha = (
alpha_x * self.deltax * self.KX + alpha_y * self.deltay * self.KY
)
phase_beta = (
beta_x * self.deltax * self.KX + beta_y * self.deltay * self.KY
)
return phase_alpha, phase_beta
# energy_arr = self.sum_wavenumbers(abs(arr)**2)
# energy_array_coarse_after = oper_coarse.sum_wavenumbers(
# abs(arr_coarse)**2)
# print('energy_array_coarse_after = ', energy_array_coarse_after)
# print('energy_arr = ', energy_arr)