Source code for pygetm.open_boundaries

import enum
from typing import Optional, Sequence, Callable, Iterable, Union, NamedTuple
import functools
import logging

import numpy as np

from pygetm import core
from .constants import (
    CENTERS,
    GRAVITY,
    INTERFACES,
    ZERO_GRADIENT,
    SPONGE,
    FLATHER_ELEV,
    FLATHER_TRANSPORT,
    CLAMPED,
    TimeVarying,
    FILL_VALUE,
    CellType,
)


[docs] class Side(enum.IntEnum): WEST = 1 NORTH = 2 EAST = 3 SOUTH = 4
[docs] class OpenBoundary: """A single open boundary. This defines the location of the open boundary, but not the type of boundary condition to apply to the different variables. For that, use :attr:`core.Array.open_boundaries`. """ def __init__( self, name: str, side: Side, l: int, mstart: int, mstop: int, type_2d: int, type_3d: int, ): """Create an open boundary. The supplied indices are global indices in the T grid. Args: name: unique name for this boundary side: side of the domain l: fixed index (i for left/right boundaries, j for top/bottom) mstart: start index (j for left/right boundaries, i for top/bottom) mstop: stop index (exclusive) type_2d: default type of boundary condition for 2D variables type_3d: default type of boundary condition for 3D variables """ side = Side(side) self.name = name self.side = side self.l = l self.mstart = mstart self.mstop = mstop self.mstep = 1 if mstop > mstart else -1 self.type_2d = type_2d self.type_3d = type_3d self.inflow_sign = 1.0 if side in (Side.WEST, Side.SOUTH) else -1.0 self.np = (mstop - mstart) // self.mstep ms = np.arange(mstart, mstop, self.mstep, dtype=np.intp) ls = np.broadcast_to(np.array(l, dtype=np.intp), ms.shape) if side in (Side.WEST, Side.EAST): self.i, self.j = ls, ms else: self.i, self.j = ms, ls def __repr__(self) -> str: return ( f"{__class__.__name__}({self.name!r}, {self.side.name}," f" {self.l}, {self.mstart}, {self.mstop})" )
[docs] def to_local_grid(self, grid: core.Grid) -> Optional["LocalOpenBoundary"]: """Convert global indices to local indices of the current subdomain.""" # Convert to indices in the T grid of the current subdomain INCLUDING halos xoffset = grid.tiling.xoffset - grid.halox yoffset = grid.tiling.yoffset - grid.haloy if self.side in (Side.WEST, Side.EAST): l_offset, m_offset, l_max, m_max = (xoffset, yoffset, grid.nx_, grid.ny_) else: l_offset, m_offset, l_max, m_max = (yoffset, xoffset, grid.ny_, grid.nx_) l = self.l - l_offset mstart_ = self.mstart - m_offset mstop_ = self.mstop - m_offset # Determine the part of the boundary that lies within the current subdomain mlast = mstop_ - self.mstep if ( l < 0 or l >= l_max or max(mstart_, mlast) < 0 or min(mstart_, mlast) >= m_max ): # Boundary lies completely outside current subdomain return None mstart = min(max(0, mstart_), m_max - 1) mstop = min(max(0, mlast), m_max - 1) + self.mstep mskip = (mstart - mstart_) // self.mstep assert mskip >= 0 local_bdy = LocalOpenBoundary( self.name, self.side, l, mstart, mstop, mskip, self.type_2d, self.type_3d ) if (grid.mask.all_values[local_bdy.slice_t] == CellType.UNRESOLVED).all(): return None return local_bdy
[docs] class LocalOpenBoundary(OpenBoundary): def __init__( self, name: str, side: Side, l: int, mstart: int, mstop: int, mskip: int, type_2d: int, type_3d: int, ): super().__init__(name, side, l, mstart, mstop, type_2d, type_3d) self.mskip = mskip mslice = slice(mstart, mstop, self.mstep) if side in (Side.WEST, Side.EAST): self.slice_t = (Ellipsis, mslice, l) self.slice_uv_in = (Ellipsis, mslice, l if side == Side.WEST else l - 1) else: self.slice_t = (Ellipsis, l, mslice) self.slice_uv_in = (Ellipsis, l if side == Side.SOUTH else l - 1, mslice)
[docs] def extract_inward( self, values: np.ndarray, start: int, stop: Optional[int] = None ) -> np.ndarray: """Extract one or more rows/columns on the T grid, inward from the open boundary. Args: values: 2D or 3D array of values on the T grid start: index of the first row/column to extract stop: index of the last row/column to extract (exclusive). If not given, only a single row/column is extracted. Returns: Extracted of values. This is a view into the input array. If a single row/column is extracted, the shape of the extracted values will be ``(..., np)``, where ``np`` is the number of points of the open boundary, and ``...`` represents any non-xy dimensions of the input array. If ``n = stop - start`` rows/columns are extracted, the shape will be ``(..., np, n)``. The last dimension then runs from the row/column closest to the boundary to the one furthest away. """ l_inward = {Side.WEST: 1, Side.EAST: -1, Side.SOUTH: 1, Side.NORTH: -1}[ self.side ] mslice = slice(self.mstart, self.mstop, self.mstep) ldim = -1 if self.side in (Side.WEST, Side.EAST) else -2 lstart = self.l + l_inward * start assert lstart >= 0 and lstart < values.shape[ldim] if stop is None: # Extract a single row/column lslice = lstart else: # Extract a range of rows/columns llast = self.l + l_inward * (stop - 1) assert llast >= 0 and llast < values.shape[ldim] lstop = None if llast == 0 and l_inward == -1 else llast + l_inward lslice = slice(lstart, lstop, l_inward) if self.side in (Side.WEST, Side.EAST): return values[..., mslice, lslice] else: if stop is not None: return np.swapaxes(values[..., lslice, mslice], -1, -2) return values[..., lslice, mslice]
[docs] def extract_uv_in(self, u: np.ndarray, v: np.ndarray) -> np.ndarray: """Extract velocity or transport across the boundary from components at velocity points (U or V) just inward from the boundary. Args: u: 2D or 3D component of velocity or transport in x-direction. It must be defined on the U grid. v: 2D or 3D component of velocity or transport in y-direction. It must be defined on the V grid. Returns: 2D or 3D array of velocity or transport across the boundary. """ uv = u if self.side in (Side.WEST, Side.EAST) else v return uv[self.slice_uv_in]
[docs] class BoundaryCondition: """Base class for an open boundary condition that can be applied to any open boundary of any array. Specific types of open boundary condition (clamped, zero-gradient, etc.) must subclass this base class and implement :meth:`get_updater` """
[docs] def initialize(self, grid: core.Grid): """Initialize the boundary condition. Subclasses for specific types of open boundary condition can perform their own initialization here, e.g., by creating auxiliary arrays. This method is called once for each open boundary of each array that uses this boundary condition. Args: grid: the T grid on which the boundaries are defined """ pass
[docs] def get_updater( self, boundary: OpenBoundary, array: core.Array, bdy: np.ndarray ) -> Callable[[], None]: """Return a function that updates the model values at the open boundary. Boundary conditions that use nearby interior values (e.g., zero gradient, flow-dependent relaxation) can only use points with mask=ACTIVE (wet and active). They must skip mask=BOUNDARY to avoid dependence on the order in which boundaries are updated. Args: boundary: the open boundary array: array with all model values bdy: prescribed values at the open boundary. If depth-explicit, its shape is (np, nz), otherwise (np,) """ raise NotImplementedError
[docs] def prepare_depth_explicit(self): pass
[docs] @staticmethod def get_setter( array: core.Array, slc: tuple[Union[slice, int], ...], where: Union[bool, np.ndarray] = True, ) -> Callable[[np.ndarray], None]: """Takes an array object and a slice, and returns a function that sets values in the array at the slice, respecting the array's mask and valid range. Args: array: array that will be updated slc: slice of the values in the array that will be updated where: boolean array indicating which points to update """ out = array.all_values[slc] kwargs = dict() where = (array.grid.mask.all_values[slc] != CellType.UNRESOLVED) & where if not where.all(): kwargs["where"] = np.broadcast_to(where, out.shape) valid_min = array.attrs.get("_minimum") if isinstance(valid_min, np.ndarray): valid_min = valid_min[slc] valid_max = array.attrs.get("_maximum") if isinstance(valid_max, np.ndarray): valid_max = valid_max[slc] if valid_min is not None and valid_max is not None: return functools.partial(np.clip, valid_min, valid_max, out, **kwargs) elif valid_min is not None: return functools.partial(np.maximum, valid_min, out=out, **kwargs) elif valid_max is not None: return functools.partial(np.minimum, valid_min, out=out, **kwargs) elif "where" in kwargs: return functools.partial(np.copyto, out, **kwargs) else: return functools.partial(out.__setitem__, (Ellipsis,))
[docs] class Clamped(BoundaryCondition): """Clamped boundary condition The model values at the open boundary are set to the prescribed values. """
[docs] def get_updater( self, boundary: OpenBoundary, array: core.Array, bdy: np.ndarray ) -> Callable[[], None]: return functools.partial(self.get_setter(array, boundary.slice_t), bdy.T)
[docs] class ZeroGradient(BoundaryCondition): """Zero-gradient boundary condition The model values at the open boundary are set to the values at the first point inward from the boundary. """
[docs] def get_updater( self, boundary: OpenBoundary, array: core.Array, bdy: np.ndarray ) -> Callable[[], None]: inner_mask = boundary.extract_inward(array.grid.mask.all_values, start=1) return functools.partial( self.get_setter(array, boundary.slice_t, inner_mask == CellType.ACTIVE), boundary.extract_inward(array.all_values, start=1), )
[docs] class Sponge(Clamped): """Sponge boundary condition, with ``n`` points inward from the open boundary being relaxed to values at the boundary. If flow-dependent relaxation is used (``tmrlx=True``), values at the boundary itself will blend the prescribed values and a weighted mean over the sponge zone. The fraction taken from the prescribed values will increase with increasing inward flow velocity. If flow-dependent relaxation is not used (``tmrlx=False``), values at the boundary will be set to the prescribed values. Relaxation rates for the ``n`` points are available as :attr:`sp`. They default to the formulation of `Martinsen & Engedahl (1987) <https://doi.org/10.1016/0378-3839(87)90028-7>`_ (Eq 3).""" def __init__( self, n: int = 3, tmrlx: bool = False, tmrlx_max: float = 0.25, tmrlx_min: float = 0.0, tmrlx_ucut: float = 0.02, tmrlx_umin: Optional[float] = None, ): """ Args: n: number of points in the sponge zone; the boundary itself is not included, so ``n=0`` is equivalent to a clamped boundary. tmrlx: make use of prescribed values at the open boundary dependent on flow direction. This is typically used to gradually disable use of prescribed values as the current switches from inflow to outflow over the boundary. tmrlx_max: maximum fraction to take from prescribed values at the open boundary. The remaining fraction will be taken from a weighted mean over the sponge zone. This maximum is reached at strong inflow (>= ``tmrlx_ucut``). Only used if ``tmrlx`` is ``True``. tmrlx_min: minimum fraction to take from prescribed values at the open boundary. The remaining fraction will be taken from a weighted mean over the sponge zone. This minimum is reached when water flows outward over the boundary (inflow <= ``tmrlx_umin``). Only used if ``tmrlx`` is ``True``. tmrlx_ucut: inward flow velocity (m s-1) at which the use of prescribed boundary values becomes highest. At this inflow, or higher, boundary values are for fraction ``tmrlx_max`` based on prescribed values; the remaining ``1 - tmrlx_max`` is taken from a weighted mean over the sponge zone. Only used if ``tmrlx`` is ``True``. tmrlx_umin: inward flow velocity (m s-1) at which the use of prescribed boundary values becomes minimal. Use negative values to indicate outflow. At the specified inflow value, or lower, boundary values are for fraction ``tmrlx_min`` based on prescribed values; the remaining ``1 - tmrlx_min`` is taken from a weighted mean over the sponge zone. If ``None``, ``tmrlx_umin`` is set to ``-0.25 * tmrlx_ucut``. Only used if ``tmrlx`` is ``True``. """ self.n = n self.tmrlx = tmrlx self.tmrlx_max = tmrlx_max self.tmrlx_min = tmrlx_min self.tmrlx_ucut = tmrlx_ucut if tmrlx_umin is None: tmrlx_umin = -0.25 * tmrlx_ucut self.tmrlx_umin = tmrlx_umin self.rlxcoef: Optional[core.Array] = None self.inflow: Optional[np.ndarray] = None # relaxation coefficient as per Martinsen & Engedahl (1987), Eq 3 # https://doi.org/10.1016/0378-3839(87)90028-7 # These represent the fraction of the cell's value that is replaced # by the value at the boundary, per timestep, as we move inward from # the boundary. self.sp = ((n - np.arange(n)) / (n + 1.0)) ** 2
[docs] def initialize(self, grid: core.Grid): """Create the relaxation coefficient array if using flow-dependent relaxation. This will be called for every open boundary of every array that uses this boundary condition, so we take care to only create the array once. """ if self.tmrlx and self.rlxcoef is None: self.rlxcoef = grid.array(z=CENTERS, on_boundary=True) grid.open_boundaries.uv_in.saved = True self.inflow = grid.open_boundaries.uv_in.all_values
[docs] def prepare_depth_explicit(self): """Update the relaxation coefficient based on the current flow velocity. This is done once per timestep and processes all open boundaries at once. """ if self.tmrlx: self.rlxcoef.all_values = (self.tmrlx_max - self.tmrlx_min) * np.clip( (self.inflow - self.tmrlx_umin) / (self.tmrlx_ucut - self.tmrlx_umin), 0.0, 1.0, ) + self.tmrlx_min
[docs] def get_updater( self, boundary: OpenBoundary, array: core.Array, bdy: np.ndarray ) -> Callable[[], None]: slicer = functools.partial(boundary.extract_inward, start=1, stop=self.n + 1) # Select sponge points that are wet and active (mask=ACTIVE) and connected # to the open boundary (mask=BOUNDARY) via similar such points. bdy_active = array.grid.mask.all_values[boundary.slice_t] == CellType.BOUNDARY sponge_active = slicer(array.grid.mask.all_values) == CellType.ACTIVE assert sponge_active.shape[-1] == self.n sponge_active[:, 0] &= bdy_active np.logical_and.accumulate(sponge_active, axis=-1, out=sponge_active) # Relax sponge zone to boundary values using coefficients sp sponge_target = array.all_values[boundary.slice_t][..., np.newaxis] collection = array.open_boundaries collection.add_relaxation(slicer, self.sp, sponge_target, sponge_active) if self.tmrlx: # Boundary values will blend prescribed values and a weighted mean # over the sponge zone. The fraction taken from the prescribed # values will increase with increasing inward flow velocity. w = np.where(sponge_active, self.sp, 0.0) wsum = w.sum(axis=1, keepdims=True) np.divide(w, wsum, out=w, where=wsum != 0.0) # The flow-dependent relaxation rlxcoef varies in time and will be # updated by prepare_depth_explicit. It covers all boundaries; here # we obtain its slice corresponding to the current boundary. rlxcoef = self.rlxcoef.all_values[boundary.slice_bdy].T return functools.partial( self.update_boundary, slicer(array.all_values), bdy.T, rlxcoef, w, w != 0.0, (w == 0.0).all(axis=1), self.get_setter(array, boundary.slice_t), ) else: # Clamp boundary to prescribed values return super().get_updater(boundary, array, bdy)
[docs] @staticmethod def update_boundary( sponge_values: np.ndarray, bdy_values: np.ndarray, rlxcoef: np.ndarray, w: np.ndarray, nonzerow: np.ndarray, clamp: np.ndarray, bdy_setter: Callable[[np.ndarray], None], ): """Update model values at open boundary. The sponge zone update is done from :meth:`ArrayOpenBoundary.update`, via the relaxation term added in :meth:`get_updater` Args: sponge_values: current model values in the sponge zone (inward from open boundary). (nz x np x nsponge) bdy_values: prescribed values at open boundary (nz x np) rlxcoef: fraction of model values at the open boundary to be taken from prescribed values. The remainder will be based on a weighted mean over the sponge. (nz x np) w: weight for sponge mean used as part of new model values at the open boundary. (np x nsponge) nonzerow: boolean array indicating where w is non-zero. Cells with zero weight will be excluded from the sponge mean in order to avoid blending in NaNs. clamp: boolean array indicating where no sponge mean is available. In these cells, the model values will be clamped to the prescribed values. bdy_setter: function that updates values at the boundary, respecting their valid range and mask """ sponge_mean = (w * sponge_values).sum(axis=-1, where=nonzerow) blend = rlxcoef * bdy_values + (1.0 - rlxcoef) * sponge_mean bdy_setter(np.where(clamp, bdy_values, blend))
[docs] class Flather(BoundaryCondition): """Flather boundary condition for elevation. This infers the elevation at the open boundary from prescribed elevation and from the difference between prescribed and modelled velocity across the boundary. See also: Flather, R.A. (1976) A tidal model of the north-west European continental shelf. Mem. Soc. R. Sci. Liege 6 (10), 141-164. Blayo, E., & Debreu, L. (2005). Revisiting open boundary conditions from the point of view of characteristic variables. Ocean Modelling, 9(3), 231–252. `10.1016/j.ocemod.2004.07.001 <https://doi.org/10.1016/j.ocemod.2004.07.001>`_ """ def __init__(self, transport: bool = False): """ Args: transport: use prescribed transport at the boundary instead of prescribed velocity """ self.transport = transport
[docs] def get_updater( self, boundary: OpenBoundary, array: core.Array, bdy: np.ndarray ) -> Callable[[], None]: return functools.partial( self.update_transport if self.transport else self.update_velocity, bdy, boundary.UV, boundary.flow_ext, array.grid.Dclip.all_values[boundary.slice_t], boundary.inflow_sign, self.get_setter(array, boundary.slice_t), )
[docs] @staticmethod def update_velocity( z_ext: np.ndarray, tp: np.ndarray, vel_ext: np.ndarray, D: np.ndarray, inflow_sign: float, setter: Callable[[np.ndarray], None], ): setter(z_ext - inflow_sign * (tp - vel_ext * D) / np.sqrt(D * GRAVITY))
[docs] @staticmethod def update_transport( z_ext: np.ndarray, tp: np.ndarray, tp_ext: np.ndarray, D: np.ndarray, inflow_sign: float, setter: Callable[[np.ndarray, np.ndarray], None], ): setter(z_ext - inflow_sign * (tp - tp_ext) / np.sqrt(D * GRAVITY))
[docs] class ArrayOpenBoundary: """Single open boundary for a single array. The type of open boundary condition is accessible as :attr:`type`. The values prescribed at the boundary are accessible as :attr:`values`. """ def __init__( self, open_boundaries: "LocalOpenBoundaryCollection", boundary: OpenBoundary, prescribed_values: np.ndarray, model_values: np.ndarray, type: BoundaryCondition, ): self._make_bc = open_boundaries._make_bc self._boundary = boundary self._prescribed_values = prescribed_values self._model_values = model_values self._type = type @property def type(self) -> BoundaryCondition: return self._type @type.setter def type(self, value: Union[int, BoundaryCondition]): self._type = self._make_bc(value) @property def values(self) -> np.ndarray: return self._prescribed_values
[docs] class Relaxation(NamedTuple): slicer: Callable[[np.ndarray], np.ndarray] weights: np.ndarray target: np.ndarray local: np.ndarray where: np.ndarray
[docs] class ArrayOpenBoundaries: """All open boundaries for a single array. The values prescribed at all boundaries combined are accessible as :attr:`values`. The type of open boundary condition can be set for all boundaries at once via the :attr:`type` property. """ __slots__ = "_array", "values", "_bdy", "updaters", "relaxation" def __init__(self, array: core.Array, type=None): self._array = array self.values = array.grid.array( name=f"{array.name}_bdy", z=array.z, on_boundary=True, fill_value=array.fill_value, attrs={ "_time_varying": array.attrs.get("_time_varying", TimeVarying.MICRO) }, ) if type is not None: type = array.grid.open_boundaries._make_bc(type) self._bdy: list[ArrayOpenBoundary] = [] for bdy in array.grid.open_boundaries: self._bdy.append( ArrayOpenBoundary( array.grid.open_boundaries, bdy, self.values.all_values[bdy.slice_bdy], array.all_values[bdy.slice_t], type or (bdy.type_2d if array.ndim == 2 else bdy.type_3d), ) ) self.updaters: list[Callable[[], None]] = [] self.relaxation: list[Relaxation] = [] array.grid.open_boundaries._arrays.append(self) def _set_type(self, value: Union[int, BoundaryCondition]): value = self._array.grid.open_boundaries._make_bc(value) for bdy in self._bdy: bdy.type = value type = property(fset=_set_type)
[docs] def initialize(self) -> Iterable[BoundaryCondition]: bcs: set[BoundaryCondition] = set() for bdy in self._bdy: bdy._type.initialize(self._array.grid) self.updaters.append( bdy._type.get_updater( bdy._boundary, self._array, bdy._prescribed_values ) ) bcs.add(bdy._type) if self.relaxation: # Enumerate relaxation terms beyond the open boundary itself # (e.g., sponge zones) and normalize the weights so that the # combined relaxation from all open boundaries can at most exactly # replace the model values at the boundary (max total weight = 1) weight_sum = np.zeros(self.values.grid.mask.all_values.shape) for relax in self.relaxation: local_weight_sum = relax.slicer(weight_sum) local_weight_sum += relax.weights for relax in self.relaxation: relax.weights[...] /= np.maximum(relax.slicer(weight_sum), 1.0) return bcs
[docs] def add_relaxation( self, slicer: Callable[[np.ndarray], np.ndarray], weights: np.ndarray, target: np.ndarray, where: np.ndarray, ): """Add a boundary-influenced relaxation term, e.g., a sponge zone. Args: slicer: function that extracts the relaxation area from the model values. It must take a 2D or 3D array of model values and return the slice corrresponding to the relaxation area. weights: fraction of the model values replaced by relaxation. target: target values to relax towards. where: boolean array indicating where the relaxation term should be applied. """ local = slicer(self._array.all_values) mask = slicer(self._array.grid.mask.all_values) where = (mask == CellType.ACTIVE) & where weights = np.where(where, weights, 0.0) self.relaxation.append(Relaxation(slicer, weights, target, local, where))
[docs] def update(self): """Update the tracer at the open boundaries""" for updater in self.updaters: updater() deltas = [r.weights * (r.target - r.local) for r in self.relaxation] for relax, delta in zip(self.relaxation, deltas): np.add(relax.local, delta, out=relax.local, where=relax.where)
[docs] class GlobalOpenBoundaryCollection(Sequence[OpenBoundary]): def __init__( self, nx: int, ny: int, logger: logging.Logger, x: Optional[np.ndarray] = None, y: Optional[np.ndarray] = None, lon: Optional[np.ndarray] = None, lat: Optional[np.ndarray] = None, ): self.nx = nx self.ny = ny self.logger = logger self.allow_on_land = False self._boundaries: list[OpenBoundary] = [] self._x = x self._y = y self._lon = lon self._lat = lat @property def i(self) -> np.ndarray: initial = np.empty((0,), dtype=np.intp) return np.concatenate([initial] + [b.i for b in self._boundaries]) @property def j(self) -> np.ndarray: initial = np.empty((0,), dtype=np.intp) return np.concatenate([initial] + [b.j for b in self._boundaries]) @property def lon(self) -> Optional[np.ndarray]: return None if self._lon is None else self._lon[self.j, self.i] @property def lat(self) -> Optional[np.ndarray]: return None if self._lat is None else self._lat[self.j, self.i] @property def x(self) -> Optional[np.ndarray]: return None if self._x is None else self._x[self.j, self.i] @property def y(self) -> Optional[np.ndarray]: return None if self._y is None else self._y[self.j, self.i]
[docs] def add_by_index( self, side: Side, l: int, mstart: int, mstop: int, type_2d: int, type_3d: int, name: Optional[str] = None, ): """Note that l, mstart, mstop are 0-based indices of a T point in the global domain. mstop indicates the upper limit of the boundary - it is the first index that is EXcluded. """ along_y = side in (Side.WEST, Side.EAST) if along_y: l_max, m_max = (self.nx, self.ny) else: l_max, m_max = (self.ny, self.nx) assert mstart != mstop mstep = 1 if mstop > mstart else -1 mlast = mstop - mstep assert mstart >= 0 and mstart < m_max assert mlast >= 0 and mlast < m_max assert l >= 0 and l < l_max if side in (Side.WEST, Side.SOUTH): assert l < l_max - 1, "No water points on the interior side of boundary" else: assert l > 0, "No water points on the interior side of boundary" if name is None: name = str(len(self._boundaries)) mmin = min(mstart, mlast) mmax = max(mstart, mlast) for b in self._boundaries: current_along_y = b.side in (Side.WEST, Side.EAST) if along_y == current_along_y: if l == b.l: ostart = max(mmin, min(b.mstart, b.mstop - b.mstep)) ostop = min(mmax, max(b.mstart, b.mstop - b.mstep)) + 1 overlap = ostop - ostart if overlap > 0: raise Exception( f"New boundary {name} overlaps in {overlap} points" f" with existing boundary {b.name}" ) else: cross = ( l >= min(b.mstart, b.mstop - b.mstep) and l <= max(b.mstart, b.mstop - b.mstep) and b.l >= mmin and b.l <= mmax ) if cross: i, j = (l, b.l) if along_y else (b.l, l) raise Exception( f"New boundary {name} crosses existing boundary {b.name}" f" at i={i}, j={j}" ) bdy = OpenBoundary(name, side, l, mstart, mstop, type_2d, type_3d) self._boundaries.append(bdy)
[docs] def add_top_boundary( self, name: str, j: int, istart: int, istop: int, type_2d: int, type_3d: int ): """Add an open boundary with the model exterior above and the model interior below. This is a northern open boundary in a spherical domain. """ self.add_by_index(Side.NORTH, j, istart, istop, type_2d, type_3d, name=name)
[docs] def add_bottom_boundary( self, name: str, j: int, istart: int, istop: int, type_2d: int, type_3d: int ): """Add an open boundary with the model exterior below and the model interior above. This is a southern open boundary in a spherical domain. """ self.add_by_index(Side.SOUTH, j, istart, istop, type_2d, type_3d, name=name)
[docs] def add_left_boundary( self, name: str, i: int, jstart: int, jstop: int, type_2d: int, type_3d: int ): """Add an open boundary with the model exterior to the left and the model interior to the right. This is a western open boundary in a spherical domain. """ self.add_by_index(Side.WEST, i, jstart, jstop, type_2d, type_3d, name=name)
[docs] def add_right_boundary( self, name: str, i: int, jstart: int, jstop: int, type_2d: int, type_3d: int ): """Add an open boundary with the model exterior to the right and the model interior to the left. This is an eastern open boundary in a spherical domain. """ self.add_by_index(Side.EAST, i, jstart, jstop, type_2d, type_3d, name=name)
[docs] def clear(self): """Delete all open boundaries.""" self._boundaries.clear()
[docs] def adjust_mask(self, mask: np.ndarray): """Adjust the global mask for open boundaries This assigns mask values: * BOUNDARY: masked T point * MIRROR_INT: velocity point in between open boundary T points * MIRROR_EXT: velocity point just outside the open boundary Args: mask: writeable mask defined on the supergrid, with shape (1+2*ny, 1+2*nx) """ assert mask.shape == (1 + 2 * self.ny, 1 + 2 * self.nx) umask = mask[1::2, 0::2] vmask = mask[0::2, 1::2] tmask = mask[1::2, 1::2] for boundary in self._boundaries: l = boundary.l mslice = slice(boundary.mstart, boundary.mstop, boundary.mstep) if boundary.side in (Side.WEST, Side.EAST): bdy_tmask = tmask[mslice, l] bdy_uvmask = umask[mslice, l if boundary.side == Side.WEST else l + 1] else: bdy_tmask = tmask[l, mslice] bdy_uvmask = vmask[l if boundary.side == Side.SOUTH else l + 1, mslice] wet = bdy_tmask != CellType.UNRESOLVED if not wet.all(): self.logger.log( logging.WARNING if self.allow_on_land else logging.ERROR, f"Open boundary {boundary.name}: {wet.size - wet.sum()} of" f" {wet.size} points of this {boundary.side.name.capitalize()}ern" " boundary are on land", ) if not self.allow_on_land: raise Exception() # The open boundary points themselves (T grid) bdy_tmask[wet] = CellType.BOUNDARY # Velocity points that lie just outside the T points of the open boundary bdy_uvmask[wet] = CellType.MIRROR_EXT # Velocity points that lie in between open boundary T points tbdy = tmask == CellType.BOUNDARY umask[:, 1:-1][tbdy[:, :-1] & tbdy[:, 1:]] = CellType.MIRROR_INT vmask[1:-1, :][tbdy[:-1, :] & tbdy[1:, :]] = CellType.MIRROR_INT
def __getitem__(self, key: int) -> OpenBoundary: return self._boundaries[key] def __len__(self) -> int: return len(self._boundaries) def __iter__(self): return iter(self._boundaries)
[docs] class LocalOpenBoundaryCollection(Sequence[LocalOpenBoundary]): def __init__( self, global_collection: GlobalOpenBoundaryCollection, grid: core.Grid, logger: logging.Logger, ): """Freeze the open boundary collection. Drop those outside the current subdomain. """ self.logger = logger self.np = 0 self.np_glob = 0 self._boundaries: list[LocalOpenBoundary] = [] self._arrays: list[ArrayOpenBoundaries] = [] self.sponge = Sponge() self.zero_gradient = ZeroGradient() self.bcs: set[BoundaryCondition] = set() # Indices of T point in open boundary into arrays that INCLUDE halos. # We start with an empty array to support later concatenation in # subdomains without any open boundary all_i = [np.empty((0,), dtype=np.intp)] all_j = [np.empty((0,), dtype=np.intp)] side2count = {Side.WEST: 0, Side.NORTH: 0, Side.EAST: 0, Side.SOUTH: 0} self.local_to_global: list[slice] = [] bdy_types_2d = {} for global_boundary in global_collection._boundaries: boundary = global_boundary.to_local_grid(grid) if boundary is not None: # This boundary falls at least partially in the local subdomain self._boundaries.append(boundary) side2count[boundary.side] += 1 # Determine start/stop indices in arrays with all local boundary points boundary.start = self.np boundary.stop = self.np + boundary.np boundary.slice_bdy = (slice(boundary.start, boundary.stop), Ellipsis) # Determine start/stop indices in arrays with all global boundary points start_glob = self.np_glob + boundary.mskip stop_glob = start_glob + boundary.np if self.local_to_global and self.local_to_global[-1].stop == start_glob: # Attach to previous boundary by dropping previous slice # and adopting its start index start_glob = self.local_to_global.pop().start self.local_to_global.append(slice(start_glob, stop_glob)) all_i.append(boundary.i) all_j.append(boundary.j) if boundary.type_2d not in bdy_types_2d: bdy_types_2d[boundary.type_2d] = self._make_bc(boundary.type_2d) boundary.type_2d = bdy_types_2d[boundary.type_2d] self.np += boundary.np self.np_glob += global_boundary.np grid.open_boundaries = self # Local indices of open boundary points within local subdomain # (T grid, for arrays that INclude halos) self.i = np.concatenate(all_i, dtype=np.intp) self.j = np.concatenate(all_j, dtype=np.intp) self.slice = (Ellipsis, self.j, self.i) assert ( global_collection.allow_on_land or (grid.mask.all_values[self.slice] == CellType.BOUNDARY).all() ) # Global indices of open boundary points within local subdomain # (T grid, for arrays that EXclude halos) self.i_glob = self.i - grid.halox + grid.tiling.xoffset self.j_glob = self.j - grid.haloy + grid.tiling.yoffset self.logger.info( f"{sum(side2count.values())} open boundaries" f" ({side2count[Side.WEST]} West, {side2count[Side.NORTH]} North," f" {side2count[Side.EAST]} East, {side2count[Side.SOUTH]} South)" ) if self.np > 0: if self.np == self.np_glob: # This subdomain includes all open boundaries in full assert ( len(self.local_to_global) == 1 and self.local_to_global[0].start == 0 and self.local_to_global[0].stop == self.np_glob ) self.local_to_global = None else: # This subdomain includes part of the open boundaries slices = ", ".join( [f"[{s.start}:{s.stop}]" for s in self.local_to_global] ) self.logger.info(f"global-to-local open boundary map: {slices}") if grid.ugrid: self._configure_mirroring(grid) # Longitude/latitude of open boundary points if grid.lon is not None: self.lon = grid.array( name="lon_bdy", on_boundary=True, fill=grid.lon.all_values[self.j, self.i], attrs=dict(_time_varying=False), ) if grid.lat is not None: self.lat = grid.array( name="lat_bdy", on_boundary=True, fill=grid.lat.all_values[self.j, self.i], attrs=dict(_time_varying=False), ) if grid.nz: # Vertical coordinates of open boundary points. # These will be updated by indexing into the full zc and zf # after every update of surface elevation/water depth/cell thickness kwargs = dict(fill_value=FILL_VALUE, units="m", on_boundary=True) self.zc = grid.array( name="zc_bdy", z=CENTERS, long_name="height", attrs=grid.zc.attrs, **kwargs, ) self.zf = grid.array( name="zf_bdy", z=INTERFACES, long_name="interface height", attrs=grid.zf.attrs, **kwargs, ) # Depth-explicit inward velocity at the open boundaries self.uv_in = grid.array(z=CENTERS, on_boundary=True) # Prescribed depth-averaged or depth-integrated velocity at the open boundaries self.u = grid.array(name="u_bdy", on_boundary=True) self.v = grid.array(name="v_bdy", on_boundary=True) if grid.rotation is not None and self.np > 0: # Prescribed geocentric velocity (eastwards/northwards) needs to be rotated # to model grid. Create rotator and arrays to hold the rotated velocity. self._rotator = core.Rotator(grid.rotation.all_values[self.j, self.i]) self.u_rot = grid.array(name="u_rot_bdy", on_boundary=True) self.v_rot = grid.array(name="v_rot_bdy", on_boundary=True) else: # Model grid is geocentric - no rotation of prescribed velocities necessary self._rotator = None self.u_rot = self.u self.v_rot = self.v def _make_bc(self, value: Union[int, BoundaryCondition]) -> BoundaryCondition: if isinstance(value, BoundaryCondition): return value if value == ZERO_GRADIENT: return self.zero_gradient elif value == SPONGE: return self.sponge elif value == CLAMPED: return Clamped() elif value == FLATHER_ELEV: return Flather() elif value == FLATHER_TRANSPORT: return Flather(transport=True) else: raise Exception(f"Unknown boundary type {value} specified") def _configure_mirroring(self, grid: core.Grid): """Set up indices for mirroring across boundaries on U and V grids""" assert grid.ugrid is not None and grid.vgrid is not None nx, ny = grid.nx_, grid.ny_ class Mirror: def __init__(self, to_self: bool = False): self.i_in = [] self.j_in = [] self.i_out = [] self.j_out = [] self.to_self = to_self def append(self, i_in, j_in, i_out, j_out, where, check=None, value=None): i_in, j_in, i_out, j_out = np.broadcast_arrays(i_in, j_in, i_out, j_out) keep = (i_in >= 0) & (j_in >= 0) & (i_in < nx) & (j_in < ny) keep &= (i_out >= 0) & (j_out >= 0) & (i_out < nx) & (j_out < ny) keep &= where if keep.any(): self.i_in.append(i_in[keep]) self.j_in.append(j_in[keep]) self.i_out.append(i_out[keep]) self.j_out.append(j_out[keep]) if check is not None: assert (check[self.j_out[-1], self.i_out[-1]] == value).all() def get_slices(self) -> Optional[tuple[tuple, tuple]]: if self.i_in: i_in = np.concatenate(self.i_in, dtype=np.intp) j_in = np.concatenate(self.j_in, dtype=np.intp) i_out = np.concatenate(self.i_out, dtype=np.intp) j_out = np.concatenate(self.j_out, dtype=np.intp) if self.to_self: while True: imatch = i_in == i_out[:, np.newaxis] jmatch = j_in == j_out[:, np.newaxis] (ind_out, ind_in) = (imatch & jmatch).nonzero() if ind_out.size == 0: break i_in[ind_in] = i_in[ind_out] j_in[ind_in] = j_in[ind_out] return (Ellipsis, j_in, i_in), (Ellipsis, j_out, i_out) mirror_U = Mirror(to_self=True) mirror_V = Mirror(to_self=True) mirror_TU = Mirror() mirror_TV = Mirror() mirror_T = Mirror(to_self=True) tmask = grid.mask.all_values umask = grid.ugrid.mask.all_values vmask = grid.vgrid.mask.all_values for boundary in self._boundaries: # Select the T points from the open boundary that are actually wet # (some may be on land with mask = UNRESOLVED) tsel = tmask[boundary.j, boundary.i] == CellType.BOUNDARY # Velocity points that lie WITHIN the open boundary (mask = MIRROR_INT), # that is, they lie in between tracer points with mask = BOUNDARY. # Values at these points will be mirrored from the interior velocity point. # We look 1 point beyond the start and stop of the boundary within the # current subdomain to catch cases where (a) the boundary extends into the # next subdomain and (b) the current boundary neighbors another boundary # Note that the values in boundary.{i,j} may have stride -1, which is why # we use min()/max() to determine the range. if boundary.side in (Side.WEST, Side.EAST): j = np.arange(max(boundary.j.min() - 1, 0), boundary.j.max() + 1) i_out = boundary.i[0] i_in = i_out + {Side.EAST: -1, Side.WEST: 1}[boundary.side] mirror_V.append( i_in, j, i_out, j, vmask[j, i_out] == CellType.MIRROR_INT ) else: i = np.arange(max(boundary.i.min() - 1, 0), boundary.i.max() + 1) j_out = boundary.j[0] j_in = j_out + {Side.NORTH: -1, Side.SOUTH: 1}[boundary.side] mirror_U.append( i, j_in, i, j_out, umask[j_out, i] == CellType.MIRROR_INT ) # Velocity points OUTSIDE AND ALONG the open boundary (mask = MIRROR_EXT) # Values at these points will be mirrored from either the neighboring # inner velocity point, or from inner T point (boundary.i, boundary.j) # [e.g., elevations at mask = BOUNDARY] if boundary.side in (Side.WEST, Side.EAST): j = boundary.j i_in = boundary.i[0] + {Side.WEST: 0, Side.EAST: -1}[boundary.side] i_out = boundary.i[0] + {Side.WEST: -1, Side.EAST: 0}[boundary.side] mirror_U.append(i_in, j, i_out, j, tsel, umask, CellType.MIRROR_EXT) mirror_TU.append( boundary.i, j, i_out, j, tsel, umask, CellType.MIRROR_EXT ) else: i = boundary.i j_in = boundary.j[0] + {Side.SOUTH: 0, Side.NORTH: -1}[boundary.side] j_out = boundary.j[0] + {Side.SOUTH: -1, Side.NORTH: 0}[boundary.side] mirror_V.append(i, j_in, i, j_out, tsel, vmask, CellType.MIRROR_EXT) mirror_TV.append( i, boundary.j, i, j_out, tsel, vmask, CellType.MIRROR_EXT ) if boundary.side in (Side.WEST, Side.EAST): j = boundary.j i_in = boundary.i[0] + {Side.WEST: 1, Side.EAST: -1}[boundary.side] i_out = boundary.i[0] mirror_T.append(i_in, j, i_out, j, tsel) else: i = boundary.i j_in = boundary.j[0] + {Side.SOUTH: 1, Side.NORTH: -1}[boundary.side] j_out = boundary.j[0] mirror_T.append(i, j_in, i, j_out, tsel) grid.ugrid._mirrors[grid.ugrid] = mirror_U.get_slices() grid.vgrid._mirrors[grid.vgrid] = mirror_V.get_slices() grid._mirrors[grid.ugrid] = mirror_TU.get_slices() grid._mirrors[grid.vgrid] = mirror_TV.get_slices() grid._mirrors[grid] = mirror_T.get_slices()
[docs] def start( self, U: Optional[core.Array] = None, V: Optional[core.Array] = None, uk: Optional[core.Array] = None, vk: Optional[core.Array] = None, ): for boundary in self._boundaries: if U is not None: # Depth-integrated/depth-averaged velocity perpendicular # to the open boundary for Flather boundary conditions # Modelled depth-integrated velocity (slice from full U/V) boundary.UV = boundary.extract_uv_in(U.all_values, V.all_values) # Prescribed depth-integrated or depth-averaged velocity if boundary.side in (Side.EAST, Side.WEST): boundary.flow_ext = self.u_rot[boundary.slice_bdy] else: boundary.flow_ext = self.v_rot[boundary.slice_bdy] if uk is not None: # Depth-explicit horizontal model velocities for sponge BCs # Velocity perpendicular to the open boundary (slice from full u/v) boundary.uv = boundary.extract_uv_in(uk.all_values, vk.all_values).T # Inward velocity (to be calculated from uv and inward sign) boundary.uv_in = self.uv_in.all_values[boundary.slice_bdy] for array_open_boundaries in self._arrays: self.bcs.update(array_open_boundaries.initialize())
def __getitem__(self, key: int) -> LocalOpenBoundary: return self._boundaries[key] def __len__(self) -> int: return len(self._boundaries) def __iter__(self): return iter(self._boundaries) @property def local_to_global_indices(self) -> np.ndarray: """Indices of local open boundary points in a global open boundary array.""" indices = np.arange(self.np, dtype=np.intp) if self.local_to_global is not None: i = 0 for s in self.local_to_global: indices[i : i + s.stop - s.start] = np.arange(s.start, s.stop) i += s.stop - s.start assert i == self.np return indices
[docs] def prepare(self, macro_active: bool): """Prepare open boundary forcing. This includes rotating prescribed velocity fields, as well as extracting depth-explicit horizontal velocities from which each class of open boundary condition can calculate derived metrics.""" if self._rotator: # Rotate prescribed geocentric velocity to model grid u_rot, v_rot = self._rotator(self.u.all_values, self.v.all_values) self.u_rot.all_values[:] = u_rot self.v_rot.all_values[:] = v_rot if macro_active: if self.uv_in.saved: # Set modelled depth-explicit inward velocity for all boundaries # combined by looping over individual boundaries and assigning # to their velocity slices for boundary in self._boundaries: np.multiply(boundary.uv, boundary.inflow_sign, out=boundary.uv_in) for bc in self.bcs: bc.prepare_depth_explicit()