from typing import Optional, Union
import logging
import numpy as np
import numpy.typing as npt
from .constants import CENTERS, INTERFACES, TimeVarying, ZERO_GRADIENT, FILL_VALUE
from . import core
from . import _pygetm
[docs]
class Base:
"""Base class. It is responsible for updating layer thicknesses
``hn`` for all grids provided to initialize"""
logger: logging.Logger
def __init__(self, nz: int):
if nz <= 0:
raise Exception("Number of layers nz must be a positive number")
self.nz = nz
self.prescribed_hn: Optional[npt.NDArray[np.float64]] = None
[docs]
def initialize(
self, tgrid: core.Grid, *other_grids: core.Grid, logger: logging.Logger
):
self.logger = logger
self.tgrid = tgrid
self.other_grids = other_grids
[docs]
def update(self, timestep: float):
"""Update layer thicknesses hn for all grids"""
raise NotImplementedError
[docs]
class PerGrid(Base):
"""Base class for vertical coordinate types that apply the same operation
to every grid."""
[docs]
def initialize(self, *grids: core.Grid, logger: logging.Logger):
super().initialize(*grids, logger=logger)
self.grid_info = [self.prepare_update_args(grid) for grid in grids]
[docs]
def prepare_update_args(self, grid: core.Grid):
"""Prepare grid-specific information that will be passed as
arguments to __call__"""
return (grid.Dclip.all_values, grid.hn.all_values, grid.mask.all_values)
[docs]
def update(self, timestep: float):
"""Update all grids"""
if self.prescribed_hn is not None:
self.tgrid.hn.all_values = self.prescribed_hn
else:
self(*self.grid_info[0])
if self.other_grids:
for gi in self.grid_info[1:]:
self(*gi)
self.tgrid.hhalf.all_values = 0.5 * (
self.tgrid.ho.all_values + self.tgrid.hn.all_values
)
[docs]
def __call__(
self,
D: np.ndarray,
out: Optional[np.ndarray] = None,
where: Union[bool, npt.ArrayLike] = True,
):
"""Calculate layer thicknesses
Args:
D: water depths (m)
out: array to hold layer thicknesses. It must have shape ``(nz,) + D.shape``
where: locations where to compute thicknesses (typically: water points).
It must be broadcastable to the shape of ``D``
"""
raise NotImplementedError(
"Classes that inherit from PerGrid must implement __call__"
)
[docs]
def calculate_sigma(nz: int, ddl: float = 0.0, ddu: float = 0.0) -> np.ndarray:
"""Return sigma thicknesses (fraction of column depth) of all layers,
using a formulation that allows zooming towards the surface and bottom.
Args:
nz: number of layers
ddl: zoom factor at bottom (0: no zooming, 2: strong zooming)
ddu: zoom factor at surface (0: no zooming, 2: strong zooming)
"""
if ddl <= 0.0 and ddu <= 0.0:
return np.broadcast_to(1.0 / nz, (nz,))
ddl, ddu = max(ddl, 0.0), max(ddu, 0.0)
# This zooming routine is from Antoine Garapon, ICCH, DK
ga = np.linspace(0.0, 1.0, nz + 1)
ga[1:-1] = np.tanh((ddl + ddu) * ga[1:-1] - ddl) + np.tanh(ddl)
ga[1:-1] /= np.tanh(ddl) + np.tanh(ddu)
dga = ga[1:] - ga[:-1]
assert (dga > 0.0).all(), f"ga not monotonically increasing: {ga}"
return dga
[docs]
class Sigma(PerGrid):
"""Sigma coordinates with optional zooming towards bottom and surface"""
def __init__(self, nz: int, *, ddl: float = 0.0, ddu: float = 0.0):
"""
Args:
nz: number of layers
ddl: zoom factor at bottom (0: no zooming, 2: strong zooming)
ddu: zoom factor at surface (0: no zooming, 2: strong zooming)
"""
super().__init__(nz)
self.dga = calculate_sigma(nz, ddl, ddu)[:, np.newaxis, np.newaxis]
def __call__(
self,
D: np.ndarray,
out: Optional[np.ndarray] = None,
where: Union[bool, npt.ArrayLike] = True,
) -> np.ndarray:
# From sigma thicknesses as fraction [dga] to layer thicknesses in m [hn]
return np.multiply(self.dga, D, out=out, where=np.asarray(where, dtype=bool))
[docs]
class GVC(PerGrid):
"""Generalized Vertical Coordinates
This blends equidistant and surface/bottom-zoomed coordinates as described in
`Burchard & Petersen (1997)
<https://doi.org/10.1002/(SICI)1097-0363(19971115)25%3A9%3C1003%3A%3AAID-FLD600%3E3.0.CO%3B2-E>`_.
It is designed to keep the thickness of either the surface or bottom layer at a constant
value, except in shallow water where all layers are assigned equal thickness.
"""
def __init__(
self,
nz: int,
*,
ddl: float = 0.0,
ddu: float = 0.0,
gamma_surf: bool = True,
Dgamma: float = 0.0,
):
"""
Args:
nz: number of layers
ddl: zoom factor at bottom (0: no zooming, 2: strong zooming)
ddu: zoom factor at surface (0: no zooming, 2: strong zooming)
gamma_surf: use layers of constant thickness ``Dgamma/nz`` at surface
(otherwise, at bottom)
Dgamma: water depth below which to use equal layer thicknesses
"""
if ddl <= 0.0 and ddu <= 0.0:
raise Exception("ddl and/or ddu must be a positive number")
if Dgamma <= 0.0:
raise Exception("Dgamma must be a positive number")
super().__init__(nz)
self.dsigma = 1.0 / nz
self.dbeta = calculate_sigma(nz, ddl, ddu)
self.k_ref = -1 if gamma_surf else 0
if self.dbeta[self.k_ref] >= self.dsigma:
raise Exception(
"This GVC parameterization would always result in equidistant layers."
" If this is desired, use Sigma instead."
)
self.Dgamma = Dgamma
# Calculate valid limit for alpha, and from that, the maximum water depth
# NB the max alpha would be alpha_lim[alpha_lim > 0.0].min()
# However, where alpha_lim > 0, we know dsigma - dbeta < 0.
# Since dsigma - dbeta > -dbeta (while both are < 0), the limit must
# then be >= 1. This limit is irrelevant as alpha <= 1
# (unless dbeta[k_ref] > dsigma, but that case was already eliminated above)
alpha_lim = -self.dbeta / (self.dsigma - self.dbeta)
alpha_min = alpha_lim[alpha_lim < 0.0].max()
denom = alpha_min * self.dsigma + (1.0 - alpha_min) * self.dbeta[self.k_ref]
self.D_max = np.inf if abs(denom) < 1e-15 else (Dgamma * self.dsigma) / denom
[docs]
def initialize(self, *grids: core.Grid, logger: logging.Logger):
super().initialize(*grids, logger=logger)
self.logger.info(
f"This GVC parameterization supports water depths up to {self.D_max:.3f} m"
)
def __call__(
self,
D: np.ndarray,
out: Optional[np.ndarray] = None,
where: Optional[np.ndarray] = None,
):
if out is None:
out = np.empty(self.dbeta.shape + D.shape)
if where is None:
where = np.full(D.shape, 1, dtype=np.intc)
_pygetm.update_gvc(
self.dsigma, self.dbeta, self.Dgamma, self.k_ref, D, where, out
)
return out
[docs]
class FromTGrid(Base):
[docs]
def update(self, timestep: float = 0.0):
if self.prescribed_hn is not None:
self.tgrid.hn.all_values[...] = self.prescribed_hn
else:
self.update_T_grid(timestep)
if not self.other_grids:
return
# Calculate thicknesses at time levels halfway in between old and new
self.tgrid.hhalf.all_values = 0.5 * (
self.tgrid.ho.all_values + self.tgrid.hn.all_values
)
# Interpolate thicknesses from T grid to other grids
# Use hhalf as other grids are assumed to be half a timestep behind T grid
for grid in self.other_grids:
self.tgrid.hhalf.interp(grid.hn)
self.tgrid.hhalf.mirror(grid.hn)
grid.hn.all_values *= grid.Dclip.all_values / grid.hn.all_values.sum(axis=0)
grid.hn.all_values[..., grid._land] = FILL_VALUE # for land-water interface
grid.hn.update_halos()
[docs]
def update_T_grid(self, timestep: float):
"""Update thicknesses `hn` for T grid"""
raise NotImplementedError
[docs]
class Adaptive(FromTGrid):
"""
Adaptive vertical coordinates based on `Hofmeister et al. (2010)
<https://doi.org/10.1016/j.ocemod.2009.12.003>`_ and `their GETM
implementation <https://sourceforge.net/p/getm/code/ci/iow/tree/src/3d/adaptive_coordinates_6.F90>`_
"""
def __init__(
self,
nz: int,
*,
ddu: float = 0.0,
ddl: float = 0.0,
gamma_surf: bool = True,
Dgamma: float = 0.0,
csigma: float = 0.0,
cgvc: float = 1.0,
chsurf: float = 0.0,
hsurf: float = 0.5,
chmidd: float = 0.0,
hmidd: float = -4.0,
chbott: float = 0.0,
hbott: float = -0.25,
decay: float = 2.0 / 3.0,
hpow: int = 3,
cneigh: float = 0.0,
rneigh: float = 0.25,
cNN: float = 0.0,
drho: float = 0.3,
cSS: float = 0.0,
dvel: float = 0.1,
chmin: float = 0.0,
hmin: float = 0.3,
nvfilter: int = 1,
vfilter: float = 0.3,
nhfilter: int = 1,
hfilter: float = 0.4,
timescale: float = 14400.0,
):
"""
Initial layer thicknesses are based on Generalized Vertical Coordinates
(:class:`GVC`), parameterized by `ddu`, `ddl`, `gamma_surf`, and `Dgamma`.
This is equivalent to uniform sigma coordinates when both `ddu` and `ddl`,
are zero, and to zoomed sigma coordinates when `Dgamma` is zero.
Args:
nz: number of layers
ddu: zoom factor at surface (0: no zooming, 2: strong zooming)
ddl: zoom factor at bottom (0: no zooming, 2: strong zooming)
gamma_surf: use layers of constant thickness `Dgamma/nz` at surface (otherwise, at bottom)
Dgamma: water depth below which to use equal layer thicknesses
csigma: tendency towards equal layer thicknesses (uniform sigma)
DEPRECATED: for (zoomed) sigma background, use cgvc>0.0 with Dgamma=0 instead
cgvc: tendency towards Generalized Vertical Coordinates (:class:`GVC`),
as parameterized by `ddu`, `ddl`, `gamma_surf`, `Dgamma`.
LEAVE AT 1, so that all other tendencies are relative to this background,
and tune timescale to adjust the rate of convergence
chsurf: tendency to zoom in (prefer thinner layers) towards the surface
hsurf: reference thickness for surface layer
(absolute thickness in m if >0, relative to average thickness D/nz if <0)
chmidd: tendency to keep all layers bounded
hmidd: reference thickness for other layers
(absolute thickness in m if >0, relative to average thickness D/nz if <0)
chbott: tendency to zoom in (prefer thinner layers) towards the bottom
hbott: reference thickness for bottom layer
(absolute thickness in m if >0, relative to average thickness D/nz if <0)
decay: fraction of surface/bottom zooming tendencies (controlled by `chsurf`,
`chbott`) to preserve for each additional layer away from surface/bottom.
It must range between 0 (only apply tendency in targeted layer) and
1 (apply same tendency in all layers)
hpow: exponent for ramp of tendencies focused on reference thicknesses,
that is, the tendencies controlled by `chsurf`, `chbott`, `chmidd`.
cneigh: tendency to keep neighbors of similar size
rneigh: relative difference with the thinnest neighbor where
size-ratio-limiting tendency reaches its maximum value `cneigh`
cNN: tendency to zoom in on stable density gradients
drho: local density difference over distance D/nz (average layer thickness)
where to stop zooming in further on stable density gradients.
There the tendency reaches its maximum value of `cNN`
cSS: tendency to zoom in on horizontal velocity gradients (shear)
dvel: local difference in horizontal velocity over distance D/nz (average
layer thickness) where to stop zooming in further on velocity gradients.
There the tendency reaches its maximum value of `cSS`
chmin: tendency towards equal layer thicknesses in shallow-water regions,
maximized when average layer thickness D/nz drops below 2/3 hmin,
and reduced to zero when average layer thickness exceeds hmin
hmin: minimum layer thickness (m)
nvfilter: number of vertical filter iterations
vfilter: strength of vertical filter-of-Dgrid [0:~0.5]
nhfilter: number of horizontal filter iterations
hfilter: strength of horizontal filter-of-Dgrid [0:~0.5]
timescale: time scale of grid adaptation (s)
"""
if csigma < 0.0:
raise Exception("csigma must be non-negative")
if cgvc < 0.0:
raise Exception("cgvc must be non-negative")
if chsurf < 0.0:
raise Exception("chsurf must be non-negative")
if chbott < 0.0:
raise Exception("chbott must be non-negative")
if chmidd < 0.0:
raise Exception("chmidd must be non-negative")
if chsurf > 0.0 and hsurf == 0.0:
raise Exception("hsurf must be non-zero when chsurf is positive")
if chbott > 0.0 and hbott == 0.0:
raise Exception("hbott must be non-zero when chbott is positive")
if chmidd > 0.0 and hmidd == 0.0:
raise Exception("hmidd must be non-zero when chmidd is positive")
if decay < 0.0 or decay > 1.0:
raise Exception("decay must be between 0 and 1")
if hpow < 1:
raise Exception("hpow must be a positive integer")
if cneigh < 0.0:
raise Exception("cneigh must be non-negative")
if cneigh > 0.0 and rneigh <= 0.0:
raise Exception("rneigh must be positive when cneigh is positive")
if hmin < 0.0:
raise Exception("hmin must be non-negative")
if chmin < 0.0:
raise Exception("chmin must be non-negative")
if cNN < 0.0:
raise Exception("cNN must be non-negative")
if cNN > 0.0 and drho <= 0.0:
raise Exception("drho must be positive when cNN is positive")
if cSS < 0.0:
raise Exception("cSS must be non-negative")
if cSS > 0.0 and dvel <= 0.0:
raise Exception("dvel must be positive when cSS is positive")
if vfilter < 0.0 or vfilter > 0.5:
raise Exception("vfilter must be between 0 and 0.5")
if vfilter > 0.0 and nvfilter < 0:
raise Exception("nvfilter must be non-negative when vfilter is positive")
if hfilter < 0.0 or hfilter > 0.5:
raise Exception("hfilter must be between 0 and 0.5")
if hfilter > 0.0 and nhfilter < 0:
raise Exception("nhfilter must be non-negative when hfilter is positive")
if timescale <= 0.0:
raise Exception("timescale must be a positive value")
if vfilter == 0.0:
nvfilter = 0
if hfilter == 0.0:
nhfilter = 0
super().__init__(nz)
self.decay = decay
self.hpow = hpow
self.csigma = csigma
self.cgvc = cgvc
self.chsurf = chsurf
self.chbott = chbott
self.chmidd = chmidd
self.hsurf = hsurf
self.hbott = hbott
self.hmidd = hmidd
self.cneigh = cneigh
self.rneigh = rneigh
self.cNN = cNN
self.drho = drho
self.cSS = cSS
self.dvel = dvel
self.chmin = chmin
self.hmin = hmin
self.nvfilter = nvfilter
self.vfilter = vfilter
self.nhfilter = nhfilter
self.hfilter = hfilter
# self.split = split
self.split = 1
self.timescale = timescale
if self.cgvc == 0.0:
self._gvc = None
elif Dgamma == 0.0:
self._gvc = Sigma(nz, ddu=ddu, ddl=ddl)
self._gvc.k_ref = -1 if ddu >= ddl else 0
else:
self._gvc = GVC(nz, ddu=ddu, ddl=ddl, gamma_surf=gamma_surf, Dgamma=Dgamma)
[docs]
def initialize(
self,
tgrid: core.Grid,
*other_grids: core.Grid,
logger: logging.Logger,
):
super().initialize(tgrid, *other_grids, logger=logger)
logger.warning("Support for adaptive vertical coordinates is experimental")
if self.csigma == 0.0 and self.cgvc == 0.0:
logger.warning(
f"Not relaxing to any background layer distribution (sigma or gvc)."
)
if self.hfilter > 0.0 and (tgrid.halox < 1 or tgrid.haloy < 1):
logger.warning("Disabling horizontal filter because grid has no halo zones")
self.nhfilter = 0
if self._gvc:
self._gvc.initialize(tgrid, logger=logger)
self.nug = tgrid.array(
name="nug",
units="s-1",
long_name="vertical grid diffusivity",
z=CENTERS,
attrs=dict(_time_varying=TimeVarying.MACRO, _mask_output=True),
fill_value=FILL_VALUE,
)
self.ga = tgrid.array(
name="ga",
long_name="gamma coordinate",
z=INTERFACES,
attrs=dict(_time_varying=TimeVarying.MACRO, _mask_output=True),
)
# Obtain additional fields used by adaptive coordinates
# NN and SS should maybe be interpolated to centers
self.NN = tgrid.fields["NN"]
self.SS = tgrid.fields["SS"]
havg = tgrid.H.all_values / self.nz # ignoring elevation!
self.ihsurf = self.ihbott = self.ihmidd = tgrid._work.all_values
if self.chsurf > 0.0:
hsurf = self.hsurf if self.hsurf > 0.0 else -self.hsurf * havg
self.ihsurf = np.full_like(tgrid.H.all_values, 1.0 / hsurf)
if self.chbott > 0.0:
hbott = self.hbott if self.hbott > 0.0 else -self.hbott * havg
self.ihbott = np.full_like(tgrid.H.all_values, 1.0 / hbott)
if self.chmidd > 0.0:
hmidd = self.hmidd if self.hmidd > 0.0 else -self.hmidd * havg
self.ihmidd = np.full_like(tgrid.H.all_values, 1.0 / hmidd)
self.sdecay = self.decay ** np.arange(self.nz)[::-1]
self.bdecay = self.decay ** np.arange(self.nz)
self.D = tgrid.Dclip
self.hn = tgrid.hn
self.ho = tgrid.ho
self.mask = tgrid.mask
[docs]
def update_T_grid(self, timestep: float):
"""Update thicknesses `hn` for T grid"""
# Construct the grid diffusivity, which is defined per layer, and equal
# to the relative rate (tendency, in s-1) at which the layer "gives" its
# thickness to each neighbor. That is, interior layers (non-surface/bottom)
# will lose thickness delta_sigma over both interfaces, at a combined rate
# 2*nug*delta_sigma. Simultaneously they will gain thickness from both of
# their neighbors at rates nug(k-1)*delta_sigma(k-1) and
# nug(k+1)*delta_sigma(k+1).
# While constructing the tendencies, we use dimensionless rates (csigma etc.)
# These are later divided by the adaptation timescale to obtain
# the final nug in s-1.
# Tendency towards pure (non-zoomed) sigma coordinates
# This is constant for all layers in order to ensure that an equilibrium
# distribution (equal thickness fluxes in and out) is reached when all
# layers have equal thickness.
self.nug.all_values.fill(self.csigma)
# Tendency towards Generalized Vertical Coordinates (incl. zoomed sigma)
# NB the stationary solution of AVC has thicknesses proportional to
# the inverse of diffusivity (nug). The division by hn below is
# therefore necessary; its scaling with surface/bottom thickness only
# ensures the tendency (rate of convergence) equals cgvc at the surface.
if self._gvc:
self._gvc(self.D.all_values, out=self.hn.all_values)
self.nug.all_values += np.divide(
self.cgvc * self.hn.all_values[self._gvc.k_ref], self.hn.all_values
)
if timestep == 0.0:
# Equilibrium thicknesses are inversely proportional to nug.
# Use this to set initial thicknesses based on constant nug contributions.
# This supports mixtures of sigma and gvc (csigma > 0 and cgvc > 0).
if self.csigma == 0.0 and self.cgvc == 0.0:
self.logger.warning(
"Initializing without background distribution (csigma=cgvc=0)."
" Initial layer thicknesses will be uniform."
)
self.hn.fill(self.D.all_values / self.nz)
else:
self.hn.all_values = 1.0 / self.nug.all_values
self.hn.mirror()
self.hn.update_halos()
scale = self.D.all_values / self.hn.all_values.sum(axis=0)
self.hn.fill(self.hn.all_values * scale)
return
# Reconstruct old sigma positions (from -1 at bottom to 0 at surface)
# NB the values of ga will usually not be identical to those produced
# by the previous adaptive coordinate update due to river inflow and
# precipitation. Therefore we compute it anew from old layer thicknesses,
# which already incorporate these freshwater inputs (unlike, for
# example, the grid's interface coordinates zf!)
_pygetm.thickness2interface_depth(self.mask, self.ho, self.ga)
self.ga.all_values *= -1.0 / self.ga.all_values[0]
# then add contributions handled by Fortran
_pygetm.update_adaptive(
self.nug,
self.ga,
self.NN,
self.SS,
self.sdecay,
self.bdecay,
self.hpow,
self.chsurf,
self.ihsurf,
self.chmidd,
self.ihmidd,
self.chbott,
self.ihbott,
self.cneigh,
self.rneigh,
self.cNN,
self.drho,
self.cSS,
self.dvel,
self.chmin,
self.hmin,
)
min_nug = self.nug.ma.min()
assert min_nug >= 0.0, f"Negative nug values encountered: min={min_nug}"
# apply diffusion timescale
self.nug.all_values *= 2.0 / self.timescale
# apply vertical filtering from ~/python/src/filters.F90
if self.nvfilter > 0:
_pygetm.vertical_filter(self.nvfilter, self.nug, self.vfilter)
# apply horizontal filtering from ~/python/src/filters.F90
# requires halo updates
for _ in range(self.nhfilter):
self.nug.mirror()
self.nug.update_halos()
_pygetm.horizontal_filter(self.nug, self.hfilter)
# Diffuse the interface positions using the calculated tendencies
_pygetm.tridiagonal(self.nug, self.ga, timestep)
# Calculate sigma thicknesses from updated interface positions
np.subtract(
self.ga.all_values[1:], self.ga.all_values[:-1], out=self.hn.all_values
)
# Ensure sigma thicknesses are up to date on open boundaries and in halo zones
self.hn.mirror()
self.hn.update_halos()
self.hn.all_values *= self.D.all_values