Source code for pygetm.vertical_coordinates

from typing import Optional, Union
import logging

import numpy as np
import numpy.typing as npt

from .constants import CENTERS, INTERFACES, TimeVarying, ZERO_GRADIENT, FILL_VALUE
from . import core
from . import _pygetm


[docs] class Base: """Base class. It is responsible for updating layer thicknesses ``hn`` for all grids provided to initialize""" logger: logging.Logger def __init__(self, nz: int): if nz <= 0: raise Exception("Number of layers nz must be a positive number") self.nz = nz self.prescribed_hn: Optional[npt.NDArray[np.float64]] = None
[docs] def initialize( self, tgrid: core.Grid, *other_grids: core.Grid, logger: logging.Logger ): self.logger = logger self.tgrid = tgrid self.other_grids = other_grids
[docs] def update(self, timestep: float): """Update layer thicknesses hn for all grids""" raise NotImplementedError
[docs] class PerGrid(Base): """Base class for vertical coordinate types that apply the same operation to every grid."""
[docs] def initialize(self, *grids: core.Grid, logger: logging.Logger): super().initialize(*grids, logger=logger) self.grid_info = [self.prepare_update_args(grid) for grid in grids]
[docs] def prepare_update_args(self, grid: core.Grid): """Prepare grid-specific information that will be passed as arguments to __call__""" return (grid.Dclip.all_values, grid.hn.all_values, grid.mask.all_values)
[docs] def update(self, timestep: float): """Update all grids""" if self.prescribed_hn is not None: self.tgrid.hn.all_values = self.prescribed_hn else: self(*self.grid_info[0]) if self.other_grids: for gi in self.grid_info[1:]: self(*gi) self.tgrid.hhalf.all_values = 0.5 * ( self.tgrid.ho.all_values + self.tgrid.hn.all_values )
[docs] def __call__( self, D: np.ndarray, out: Optional[np.ndarray] = None, where: Union[bool, npt.ArrayLike] = True, ): """Calculate layer thicknesses Args: D: water depths (m) out: array to hold layer thicknesses. It must have shape ``(nz,) + D.shape`` where: locations where to compute thicknesses (typically: water points). It must be broadcastable to the shape of ``D`` """ raise NotImplementedError( "Classes that inherit from PerGrid must implement __call__" )
[docs] def calculate_sigma(nz: int, ddl: float = 0.0, ddu: float = 0.0) -> np.ndarray: """Return sigma thicknesses (fraction of column depth) of all layers, using a formulation that allows zooming towards the surface and bottom. Args: nz: number of layers ddl: zoom factor at bottom (0: no zooming, 2: strong zooming) ddu: zoom factor at surface (0: no zooming, 2: strong zooming) """ if ddl <= 0.0 and ddu <= 0.0: return np.broadcast_to(1.0 / nz, (nz,)) ddl, ddu = max(ddl, 0.0), max(ddu, 0.0) # This zooming routine is from Antoine Garapon, ICCH, DK ga = np.linspace(0.0, 1.0, nz + 1) ga[1:-1] = np.tanh((ddl + ddu) * ga[1:-1] - ddl) + np.tanh(ddl) ga[1:-1] /= np.tanh(ddl) + np.tanh(ddu) dga = ga[1:] - ga[:-1] assert (dga > 0.0).all(), f"ga not monotonically increasing: {ga}" return dga
[docs] class Sigma(PerGrid): """Sigma coordinates with optional zooming towards bottom and surface""" def __init__(self, nz: int, *, ddl: float = 0.0, ddu: float = 0.0): """ Args: nz: number of layers ddl: zoom factor at bottom (0: no zooming, 2: strong zooming) ddu: zoom factor at surface (0: no zooming, 2: strong zooming) """ super().__init__(nz) self.dga = calculate_sigma(nz, ddl, ddu)[:, np.newaxis, np.newaxis] def __call__( self, D: np.ndarray, out: Optional[np.ndarray] = None, where: Union[bool, npt.ArrayLike] = True, ) -> np.ndarray: # From sigma thicknesses as fraction [dga] to layer thicknesses in m [hn] return np.multiply(self.dga, D, out=out, where=np.asarray(where, dtype=bool))
[docs] class GVC(PerGrid): """Generalized Vertical Coordinates This blends equidistant and surface/bottom-zoomed coordinates as described in `Burchard & Petersen (1997) <https://doi.org/10.1002/(SICI)1097-0363(19971115)25%3A9%3C1003%3A%3AAID-FLD600%3E3.0.CO%3B2-E>`_. It is designed to keep the thickness of either the surface or bottom layer at a constant value, except in shallow water where all layers are assigned equal thickness. """ def __init__( self, nz: int, *, ddl: float = 0.0, ddu: float = 0.0, gamma_surf: bool = True, Dgamma: float = 0.0, ): """ Args: nz: number of layers ddl: zoom factor at bottom (0: no zooming, 2: strong zooming) ddu: zoom factor at surface (0: no zooming, 2: strong zooming) gamma_surf: use layers of constant thickness ``Dgamma/nz`` at surface (otherwise, at bottom) Dgamma: water depth below which to use equal layer thicknesses """ if ddl <= 0.0 and ddu <= 0.0: raise Exception("ddl and/or ddu must be a positive number") if Dgamma <= 0.0: raise Exception("Dgamma must be a positive number") super().__init__(nz) self.dsigma = 1.0 / nz self.dbeta = calculate_sigma(nz, ddl, ddu) self.k_ref = -1 if gamma_surf else 0 if self.dbeta[self.k_ref] >= self.dsigma: raise Exception( "This GVC parameterization would always result in equidistant layers." " If this is desired, use Sigma instead." ) self.Dgamma = Dgamma # Calculate valid limit for alpha, and from that, the maximum water depth # NB the max alpha would be alpha_lim[alpha_lim > 0.0].min() # However, where alpha_lim > 0, we know dsigma - dbeta < 0. # Since dsigma - dbeta > -dbeta (while both are < 0), the limit must # then be >= 1. This limit is irrelevant as alpha <= 1 # (unless dbeta[k_ref] > dsigma, but that case was already eliminated above) alpha_lim = -self.dbeta / (self.dsigma - self.dbeta) alpha_min = alpha_lim[alpha_lim < 0.0].max() denom = alpha_min * self.dsigma + (1.0 - alpha_min) * self.dbeta[self.k_ref] self.D_max = np.inf if abs(denom) < 1e-15 else (Dgamma * self.dsigma) / denom
[docs] def initialize(self, *grids: core.Grid, logger: logging.Logger): super().initialize(*grids, logger=logger) self.logger.info( f"This GVC parameterization supports water depths up to {self.D_max:.3f} m" )
def __call__( self, D: np.ndarray, out: Optional[np.ndarray] = None, where: Optional[np.ndarray] = None, ): if out is None: out = np.empty(self.dbeta.shape + D.shape) if where is None: where = np.full(D.shape, 1, dtype=np.intc) _pygetm.update_gvc( self.dsigma, self.dbeta, self.Dgamma, self.k_ref, D, where, out ) return out
[docs] class FromTGrid(Base):
[docs] def update(self, timestep: float = 0.0): if self.prescribed_hn is not None: self.tgrid.hn.all_values[...] = self.prescribed_hn else: self.update_T_grid(timestep) if not self.other_grids: return # Calculate thicknesses at time levels halfway in between old and new self.tgrid.hhalf.all_values = 0.5 * ( self.tgrid.ho.all_values + self.tgrid.hn.all_values ) # Interpolate thicknesses from T grid to other grids # Use hhalf as other grids are assumed to be half a timestep behind T grid for grid in self.other_grids: self.tgrid.hhalf.interp(grid.hn) self.tgrid.hhalf.mirror(grid.hn) grid.hn.all_values *= grid.Dclip.all_values / grid.hn.all_values.sum(axis=0) grid.hn.all_values[..., grid._land] = FILL_VALUE # for land-water interface grid.hn.update_halos()
[docs] def update_T_grid(self, timestep: float): """Update thicknesses `hn` for T grid""" raise NotImplementedError
[docs] class Adaptive(FromTGrid): """ Adaptive vertical coordinates based on `Hofmeister et al. (2010) <https://doi.org/10.1016/j.ocemod.2009.12.003>`_ and `their GETM implementation <https://sourceforge.net/p/getm/code/ci/iow/tree/src/3d/adaptive_coordinates_6.F90>`_ """ def __init__( self, nz: int, *, ddu: float = 0.0, ddl: float = 0.0, gamma_surf: bool = True, Dgamma: float = 0.0, csigma: float = 0.0, cgvc: float = 1.0, chsurf: float = 0.0, hsurf: float = 0.5, chmidd: float = 0.0, hmidd: float = -4.0, chbott: float = 0.0, hbott: float = -0.25, decay: float = 2.0 / 3.0, hpow: int = 3, cneigh: float = 0.0, rneigh: float = 0.25, cNN: float = 0.0, drho: float = 0.3, cSS: float = 0.0, dvel: float = 0.1, chmin: float = 0.0, hmin: float = 0.3, nvfilter: int = 1, vfilter: float = 0.3, nhfilter: int = 1, hfilter: float = 0.4, timescale: float = 14400.0, ): """ Initial layer thicknesses are based on Generalized Vertical Coordinates (:class:`GVC`), parameterized by `ddu`, `ddl`, `gamma_surf`, and `Dgamma`. This is equivalent to uniform sigma coordinates when both `ddu` and `ddl`, are zero, and to zoomed sigma coordinates when `Dgamma` is zero. Args: nz: number of layers ddu: zoom factor at surface (0: no zooming, 2: strong zooming) ddl: zoom factor at bottom (0: no zooming, 2: strong zooming) gamma_surf: use layers of constant thickness `Dgamma/nz` at surface (otherwise, at bottom) Dgamma: water depth below which to use equal layer thicknesses csigma: tendency towards equal layer thicknesses (uniform sigma) DEPRECATED: for (zoomed) sigma background, use cgvc>0.0 with Dgamma=0 instead cgvc: tendency towards Generalized Vertical Coordinates (:class:`GVC`), as parameterized by `ddu`, `ddl`, `gamma_surf`, `Dgamma`. LEAVE AT 1, so that all other tendencies are relative to this background, and tune timescale to adjust the rate of convergence chsurf: tendency to zoom in (prefer thinner layers) towards the surface hsurf: reference thickness for surface layer (absolute thickness in m if >0, relative to average thickness D/nz if <0) chmidd: tendency to keep all layers bounded hmidd: reference thickness for other layers (absolute thickness in m if >0, relative to average thickness D/nz if <0) chbott: tendency to zoom in (prefer thinner layers) towards the bottom hbott: reference thickness for bottom layer (absolute thickness in m if >0, relative to average thickness D/nz if <0) decay: fraction of surface/bottom zooming tendencies (controlled by `chsurf`, `chbott`) to preserve for each additional layer away from surface/bottom. It must range between 0 (only apply tendency in targeted layer) and 1 (apply same tendency in all layers) hpow: exponent for ramp of tendencies focused on reference thicknesses, that is, the tendencies controlled by `chsurf`, `chbott`, `chmidd`. cneigh: tendency to keep neighbors of similar size rneigh: relative difference with the thinnest neighbor where size-ratio-limiting tendency reaches its maximum value `cneigh` cNN: tendency to zoom in on stable density gradients drho: local density difference over distance D/nz (average layer thickness) where to stop zooming in further on stable density gradients. There the tendency reaches its maximum value of `cNN` cSS: tendency to zoom in on horizontal velocity gradients (shear) dvel: local difference in horizontal velocity over distance D/nz (average layer thickness) where to stop zooming in further on velocity gradients. There the tendency reaches its maximum value of `cSS` chmin: tendency towards equal layer thicknesses in shallow-water regions, maximized when average layer thickness D/nz drops below 2/3 hmin, and reduced to zero when average layer thickness exceeds hmin hmin: minimum layer thickness (m) nvfilter: number of vertical filter iterations vfilter: strength of vertical filter-of-Dgrid [0:~0.5] nhfilter: number of horizontal filter iterations hfilter: strength of horizontal filter-of-Dgrid [0:~0.5] timescale: time scale of grid adaptation (s) """ if csigma < 0.0: raise Exception("csigma must be non-negative") if cgvc < 0.0: raise Exception("cgvc must be non-negative") if chsurf < 0.0: raise Exception("chsurf must be non-negative") if chbott < 0.0: raise Exception("chbott must be non-negative") if chmidd < 0.0: raise Exception("chmidd must be non-negative") if chsurf > 0.0 and hsurf == 0.0: raise Exception("hsurf must be non-zero when chsurf is positive") if chbott > 0.0 and hbott == 0.0: raise Exception("hbott must be non-zero when chbott is positive") if chmidd > 0.0 and hmidd == 0.0: raise Exception("hmidd must be non-zero when chmidd is positive") if decay < 0.0 or decay > 1.0: raise Exception("decay must be between 0 and 1") if hpow < 1: raise Exception("hpow must be a positive integer") if cneigh < 0.0: raise Exception("cneigh must be non-negative") if cneigh > 0.0 and rneigh <= 0.0: raise Exception("rneigh must be positive when cneigh is positive") if hmin < 0.0: raise Exception("hmin must be non-negative") if chmin < 0.0: raise Exception("chmin must be non-negative") if cNN < 0.0: raise Exception("cNN must be non-negative") if cNN > 0.0 and drho <= 0.0: raise Exception("drho must be positive when cNN is positive") if cSS < 0.0: raise Exception("cSS must be non-negative") if cSS > 0.0 and dvel <= 0.0: raise Exception("dvel must be positive when cSS is positive") if vfilter < 0.0 or vfilter > 0.5: raise Exception("vfilter must be between 0 and 0.5") if vfilter > 0.0 and nvfilter < 0: raise Exception("nvfilter must be non-negative when vfilter is positive") if hfilter < 0.0 or hfilter > 0.5: raise Exception("hfilter must be between 0 and 0.5") if hfilter > 0.0 and nhfilter < 0: raise Exception("nhfilter must be non-negative when hfilter is positive") if timescale <= 0.0: raise Exception("timescale must be a positive value") if vfilter == 0.0: nvfilter = 0 if hfilter == 0.0: nhfilter = 0 super().__init__(nz) self.decay = decay self.hpow = hpow self.csigma = csigma self.cgvc = cgvc self.chsurf = chsurf self.chbott = chbott self.chmidd = chmidd self.hsurf = hsurf self.hbott = hbott self.hmidd = hmidd self.cneigh = cneigh self.rneigh = rneigh self.cNN = cNN self.drho = drho self.cSS = cSS self.dvel = dvel self.chmin = chmin self.hmin = hmin self.nvfilter = nvfilter self.vfilter = vfilter self.nhfilter = nhfilter self.hfilter = hfilter # self.split = split self.split = 1 self.timescale = timescale if self.cgvc == 0.0: self._gvc = None elif Dgamma == 0.0: self._gvc = Sigma(nz, ddu=ddu, ddl=ddl) self._gvc.k_ref = -1 if ddu >= ddl else 0 else: self._gvc = GVC(nz, ddu=ddu, ddl=ddl, gamma_surf=gamma_surf, Dgamma=Dgamma)
[docs] def initialize( self, tgrid: core.Grid, *other_grids: core.Grid, logger: logging.Logger, ): super().initialize(tgrid, *other_grids, logger=logger) logger.warning("Support for adaptive vertical coordinates is experimental") if self.csigma == 0.0 and self.cgvc == 0.0: logger.warning( f"Not relaxing to any background layer distribution (sigma or gvc)." ) if self.hfilter > 0.0 and (tgrid.halox < 1 or tgrid.haloy < 1): logger.warning("Disabling horizontal filter because grid has no halo zones") self.nhfilter = 0 if self._gvc: self._gvc.initialize(tgrid, logger=logger) self.nug = tgrid.array( name="nug", units="s-1", long_name="vertical grid diffusivity", z=CENTERS, attrs=dict(_time_varying=TimeVarying.MACRO, _mask_output=True), fill_value=FILL_VALUE, ) self.ga = tgrid.array( name="ga", long_name="gamma coordinate", z=INTERFACES, attrs=dict(_time_varying=TimeVarying.MACRO, _mask_output=True), ) # Obtain additional fields used by adaptive coordinates # NN and SS should maybe be interpolated to centers self.NN = tgrid.fields["NN"] self.SS = tgrid.fields["SS"] havg = tgrid.H.all_values / self.nz # ignoring elevation! self.ihsurf = self.ihbott = self.ihmidd = tgrid._work.all_values if self.chsurf > 0.0: hsurf = self.hsurf if self.hsurf > 0.0 else -self.hsurf * havg self.ihsurf = np.full_like(tgrid.H.all_values, 1.0 / hsurf) if self.chbott > 0.0: hbott = self.hbott if self.hbott > 0.0 else -self.hbott * havg self.ihbott = np.full_like(tgrid.H.all_values, 1.0 / hbott) if self.chmidd > 0.0: hmidd = self.hmidd if self.hmidd > 0.0 else -self.hmidd * havg self.ihmidd = np.full_like(tgrid.H.all_values, 1.0 / hmidd) self.sdecay = self.decay ** np.arange(self.nz)[::-1] self.bdecay = self.decay ** np.arange(self.nz) self.D = tgrid.Dclip self.hn = tgrid.hn self.ho = tgrid.ho self.mask = tgrid.mask
[docs] def update_T_grid(self, timestep: float): """Update thicknesses `hn` for T grid""" # Construct the grid diffusivity, which is defined per layer, and equal # to the relative rate (tendency, in s-1) at which the layer "gives" its # thickness to each neighbor. That is, interior layers (non-surface/bottom) # will lose thickness delta_sigma over both interfaces, at a combined rate # 2*nug*delta_sigma. Simultaneously they will gain thickness from both of # their neighbors at rates nug(k-1)*delta_sigma(k-1) and # nug(k+1)*delta_sigma(k+1). # While constructing the tendencies, we use dimensionless rates (csigma etc.) # These are later divided by the adaptation timescale to obtain # the final nug in s-1. # Tendency towards pure (non-zoomed) sigma coordinates # This is constant for all layers in order to ensure that an equilibrium # distribution (equal thickness fluxes in and out) is reached when all # layers have equal thickness. self.nug.all_values.fill(self.csigma) # Tendency towards Generalized Vertical Coordinates (incl. zoomed sigma) # NB the stationary solution of AVC has thicknesses proportional to # the inverse of diffusivity (nug). The division by hn below is # therefore necessary; its scaling with surface/bottom thickness only # ensures the tendency (rate of convergence) equals cgvc at the surface. if self._gvc: self._gvc(self.D.all_values, out=self.hn.all_values) self.nug.all_values += np.divide( self.cgvc * self.hn.all_values[self._gvc.k_ref], self.hn.all_values ) if timestep == 0.0: # Equilibrium thicknesses are inversely proportional to nug. # Use this to set initial thicknesses based on constant nug contributions. # This supports mixtures of sigma and gvc (csigma > 0 and cgvc > 0). if self.csigma == 0.0 and self.cgvc == 0.0: self.logger.warning( "Initializing without background distribution (csigma=cgvc=0)." " Initial layer thicknesses will be uniform." ) self.hn.fill(self.D.all_values / self.nz) else: self.hn.all_values = 1.0 / self.nug.all_values self.hn.mirror() self.hn.update_halos() scale = self.D.all_values / self.hn.all_values.sum(axis=0) self.hn.fill(self.hn.all_values * scale) return # Reconstruct old sigma positions (from -1 at bottom to 0 at surface) # NB the values of ga will usually not be identical to those produced # by the previous adaptive coordinate update due to river inflow and # precipitation. Therefore we compute it anew from old layer thicknesses, # which already incorporate these freshwater inputs (unlike, for # example, the grid's interface coordinates zf!) _pygetm.thickness2interface_depth(self.mask, self.ho, self.ga) self.ga.all_values *= -1.0 / self.ga.all_values[0] # then add contributions handled by Fortran _pygetm.update_adaptive( self.nug, self.ga, self.NN, self.SS, self.sdecay, self.bdecay, self.hpow, self.chsurf, self.ihsurf, self.chmidd, self.ihmidd, self.chbott, self.ihbott, self.cneigh, self.rneigh, self.cNN, self.drho, self.cSS, self.dvel, self.chmin, self.hmin, ) min_nug = self.nug.ma.min() assert min_nug >= 0.0, f"Negative nug values encountered: min={min_nug}" # apply diffusion timescale self.nug.all_values *= 2.0 / self.timescale # apply vertical filtering from ~/python/src/filters.F90 if self.nvfilter > 0: _pygetm.vertical_filter(self.nvfilter, self.nug, self.vfilter) # apply horizontal filtering from ~/python/src/filters.F90 # requires halo updates for _ in range(self.nhfilter): self.nug.mirror() self.nug.update_halos() _pygetm.horizontal_filter(self.nug, self.hfilter) # Diffuse the interface positions using the calculated tendencies _pygetm.tridiagonal(self.nug, self.ga, timestep) # Calculate sigma thicknesses from updated interface positions np.subtract( self.ga.all_values[1:], self.ga.all_values[:-1], out=self.hn.all_values ) # Ensure sigma thicknesses are up to date on open boundaries and in halo zones self.hn.mirror() self.hn.update_halos() self.hn.all_values *= self.D.all_values