Source code for fluidsim_core.output.phys_fields

"""Abstract base classes for Physical fields output
---------------------------------------------------

These classes can be used to define classes working together with
:class:`fluidsim_core.output.movies.MoviesBasePhysFields`.

.. autoclass:: PhysFieldsABC
   :members:
   :private-members:
   :undoc-members:

.. autoclass:: SetOfPhysFieldFilesABC
   :members:
   :private-members:
   :undoc-members:

"""

from abc import ABCMeta, abstractmethod
from typing import Optional
from pathlib import Path
import math

import numpy as np


[docs] class SetOfPhysFieldFilesABC(metaclass=ABCMeta): times: list
[docs] @abstractmethod def update_times(self): "Update the times of the files"
[docs] @abstractmethod def get_min_time(self): "Get minimum time"
[docs] class PhysFieldsABC(metaclass=ABCMeta): _equation: Optional[str] _can_plot_quiver: Optional[bool] set_of_phys_files: SetOfPhysFieldFilesABC
[docs] @abstractmethod def get_key_field_to_plot(self, forbid_compute=False, key_prefered=None): "Get the key corresponding to the field to be plotted"
[docs] @abstractmethod def get_field_to_plot( self, key=None, time=None, idx_time=None, equation=None, interpolate_time=True, skip_vars=(), ): """Get the field to be plotted in process 0."""
[docs] @abstractmethod def get_vector_for_plot( self, from_state=False, time=None, interpolate_time=True ): "Get the vector components"
[docs] def _set_title(self, ax, key, time, vmax=None): title = f"{key}, $t = {time:.3f}$" if vmax is not None: title += r", $|\vec{v}|_{max} = $" + f"{vmax:.3f}" ax.set_title(title)
[docs] @abstractmethod def _get_axis_data(equation=None): "Get x and y axes data"
[docs] def set_equation_crosssection(self, equation): """Set the equation defining the cross-section. Parameters ---------- equation : str The equation can be of the shape 'iz=2', 'z=1', ... """ self._equation = equation
[docs] def _init_skip_quiver(self): params = self.output.sim.params Lx = params.oper.Lx Ly = params.oper.Ly x, y = self._get_axis_data() # 4% of the Lx it is a great separation between vector arrows. try: delta_quiver = 0.04 * min(Lx, Ly) except AttributeError: skip = 1 else: skip = (len(x) / Lx) * delta_quiver skip = int(np.round(skip)) if skip < 1: skip = 1 self._skip_quiver = skip return skip
class SetOfPhysFieldFilesBase(SetOfPhysFieldFilesABC): @staticmethod @abstractmethod def time_from_path(path): "Get the time corresponding to a file" def __init__(self, path_dir=None, output=None): self.output = output if path_dir is None: if output is None: path_dir = Path.cwd() else: path_dir = Path(output.path_run) self.path_dir = Path(path_dir) self._glob_pattern = self._get_glob_pattern() self.update_times() def _get_glob_pattern(self): return "state_phys*.[hn]*" def update_times(self): """Initialize the times by globing and analyzing the file names.""" path_files = sorted(self.path_dir.glob(self._glob_pattern)) if hasattr(self, "path_files") and len(self.path_files) == len( path_files ): return self.path_files = sorted(path_files) self.times = np.array( [self.time_from_path(path) for path in self.path_files] ) def get_min_time(self): if hasattr(self, "times"): return self.times.min() return 0.0 def get_max_time(self): return self.times.max() def get_field_to_plot( self, time=None, idx_time=None, key=None, equation=None, interpolate_time=True, skip_vars=(), ): if time is None and idx_time is None: self.update_times() time = self.times[-1] # Assert files are available if self.times.size == 0: raise FileNotFoundError( f"No phys files were detected in directory: {self.path_dir}" ) if not interpolate_time and time is not None: idx, time_closest = self.get_closest_time_file(time) return self.get_field_to_plot( idx_time=idx, key=key, equation=equation, skip_vars=skip_vars ) if interpolate_time and time is not None: idx_closest, time_closest = self.get_closest_time_file(time) if math.isclose(time, time_closest) or self.times.size == 1: return self.get_field_to_plot( idx_time=idx_closest, key=key, equation=equation, skip_vars=skip_vars, ) if idx_closest == self.times.size - 1: idx0 = idx_closest - 1 idx1 = idx_closest elif time_closest < time or idx_closest == 0: idx0 = idx_closest idx1 = idx_closest + 1 elif time_closest > time: idx0 = idx_closest - 1 idx1 = idx_closest dt_save = self.times[idx1] - self.times[idx0] weight0 = 1 - np.abs(time - self.times[idx0]) / dt_save weight1 = 1 - np.abs(time - self.times[idx1]) / dt_save field0, time0 = self.get_field_to_plot( idx_time=idx0, key=key, equation=equation, skip_vars=skip_vars ) field1, time1 = self.get_field_to_plot( idx_time=idx1, key=key, equation=equation, skip_vars=skip_vars ) return field0 * weight0 + field1 * weight1, time return self._get_field_to_plot_from_file( self.path_files[idx_time], key, equation=equation, skip_vars=skip_vars ) def get_closest_time_file(self, time): """Find the index and value of the closest actual time of the field.""" idx = np.abs(self.times - time).argmin() return idx, self.times[idx] @abstractmethod def _get_field_to_plot_from_file( self, path_file, key, equation, skip_vars=() ): "Get a 2d field from a file"