from typing import Optional, Union
import os
import numpy as np
import netCDF4
import pygetm.domain
import pygetm.open_boundaries
[docs]
def domain_from_topo(
path: Union[str, os.PathLike[str]], **kwargs
) -> pygetm.domain.Domain:
"""Create a domain object from a topo.nc file used by legacy GETM.
Args:
path: NetCDF file with legacy domain topography
**kwargs: keyword arguments that are ultimately passed to
:class:`pygetm.domain.Domain`
Bottom roughness can be prescribed with keyword argument ``z0``,
which will be passed to :class:`pygetm.domain.Domain`. This argument `must`
be provided if the topo file does not contain bottom roughness ``z0``.
If ``z0`` is present in the argument list as well as the topo file, the
argument takes priority.
"""
def _get_metrics(nc: netCDF4.Dataset):
H = nc.variables["bathymetry"]
yname, xname = H.dimensions
if hasattr(H, "missing_value"):
H = np.ma.masked_equal(H, H.missing_value)
# Follow legacy GETM in inferring regularly spaced grids from
# first and last value. This could improve accuracy if values in the topo
# file are not stored in double precision.
ncx = nc.variables[xname]
ncy = nc.variables[yname]
x = np.linspace(ncx[0], ncx[-1], ncx.size, dtype=float)
y = np.linspace(ncy[0], ncy[-1], ncy.size, dtype=float)
return x, y, H
with netCDF4.Dataset(path) as nc:
grid_type = int(np.reshape(nc["grid_type"], ()))
if grid_type == 1:
# Cartesian
x, y, H = _get_metrics(nc)
kwargs.setdefault("lon", nc.variables.get("lonc", None))
kwargs.setdefault("lat", nc.variables.get("latc", None))
domain = pygetm.domain.create_cartesian(x, y, H=H, **kwargs)
elif grid_type == 2:
# spherical
lon, lat, H = _get_metrics(nc)
domain = pygetm.domain.create_spherical(lon, lat, H=H, **kwargs)
elif grid_type == 3:
# planar curvilinear
raise NotImplementedError(
"No support yet for planar curvilinear coordinates"
)
elif grid_type == 4:
# spherical curvilinear
raise NotImplementedError(
"No support yet for spherical curvilinear coordinates"
)
else:
raise NotImplementedError(f"Unknown grid_type {grid_type} found")
if "z0" not in kwargs:
if "z0" not in nc.variables:
raise Exception(
f"Bottom roughness z0 is not present in {path}; you need to"
" provide it as keyword argument to domain_from_topo instead."
)
domain.z0 = nc["z0"]
return domain
[docs]
class DatFile:
"""Support for reading GETM dat files with comments indicated by ! or #.
Whitespace-only lines are skipped."""
def __init__(self, path: Union[str, os.PathLike[str]]):
self.path = path
self.f = open(path)
[docs]
def get_line(self) -> str:
"""Return next non-empty line"""
line = None
while not line:
line = self.f.readline()
assert (
line != ""
), f"End-of-file reached in {self.path} while trying to read next line."
line = line.split("#", 1)[0].split("!", 1)[0].strip()
return line
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.f.close()
[docs]
def load_bdyinfo(
domain: pygetm.domain.Domain,
path: Union[str, os.PathLike[str]],
type_2d: Optional[int] = None,
type_3d: Optional[int] = None,
):
"""Add open boundaries from bdyinfo.dat to domain.
Args:
domain: domain to add open boundaries to
path: data file with open boundary information
type_2d: type of 2D open boundary condition to use.
If provided, this overrides the type configured in the file.
type_3d: type of 3D open boundary condition to use.
If provided, this overrides the type configured in the file.
"""
with DatFile(path) as f:
for side in (
pygetm.open_boundaries.Side.WEST,
pygetm.open_boundaries.Side.NORTH,
pygetm.open_boundaries.Side.EAST,
pygetm.open_boundaries.Side.SOUTH,
):
n = int(f.get_line())
for _ in range(n):
# Note: for Western and Eastern boundaries, l and m are indices in x
# and y dimensions, respectively, but that is the other way around
# (y and x, respectively) for Northern and Southern boundaries.
# Note that indices are 1-based as in Fortran. We convert to the Python
# convention: 0-based indices, with the upper bound being the first
# index that is EXcluded.
l, mstart, mstop, type_2d_, type_3d_ = map(int, f.get_line().split())
domain.open_boundaries.add_by_index(
side,
l - 1,
mstart - 1,
mstop,
type_2d_ if type_2d is None else type_2d,
type_3d_ if type_3d is None else type_3d,
)
[docs]
def load_riverinfo(domain: pygetm.domain.Domain, path: Union[str, os.PathLike[str]]):
"""Add rivers from riverinfo.dat to domain
Rivers that appear multiple times in the file are meant to be split over multiple
cells. For each entry of such rivers, an index is appended to the river name to
generate a unique name for the cell-specific "river mouth". The original river
name and the number of cells it is split over are stored as attributes
``original_name`` and ``split``. These attributes can be used when accessing
river forcing for all mouths combined, for instance, the combined flow rate
stored under ``original_name`` can be divided by ``split`` to get the
flow rate for each mouth.
Args:
domain: domain to add rivers to
path: data file with river information
"""
# First count how many times each river appears.
# Rivers that appear multiple times are split over multiple cells
# and will have an index appended to their name in the final list of rivers.
name2split = {}
with DatFile(path) as f:
n = int(f.get_line())
for _ in range(n):
items = f.get_line().split()
assert len(items) in (3, 5)
name = items[2]
name2split[name] = name2split.get(name, 0) + 1
name2count = {}
with DatFile(path) as f:
n = int(f.get_line())
for _ in range(n):
items = f.get_line().split()
i, j = int(items[0]), int(items[1]) # Indices of the river cell (1-based!)
mouth_name = name = items[2]
# Depth extent: zl and zu are depths i.e. measured from the surface.
# Negative values have the following meaning - if zl < 0 use bottom
# and if zu < 0 use surface. zl and zu are optional but either none
# or both must be specified
zl, zu = np.inf, 0.0
if len(items) == 5:
zl, zu = float(items[3]), float(items[4])
zu = max(0.0, zu)
if zl < 0:
zl = np.inf
if name2split[name] > 1:
# This river is split over multiple cells; append an index to its name
imouth = name2count.get(name, 0)
mouth_name = f"{name}_{imouth}"
name2count[name] = imouth + 1
# Note: we convert from 1-based indices to 0-based indices!
# For rivers split over multiple cells, record their original name and
# the number of cells they are split over
domain.rivers.add_by_index(
mouth_name,
i - 1,
j - 1,
zl=zl,
zu=zu,
original_name=name,
split=name2split[name],
)