Source code for pygetm.input.util
from typing import Optional
import xarray as xr
import cftime
[docs]
def replace_calendar(da: xr.DataArray, calendar: str) -> xr.DataArray:
tmcoord = da.getm.time
new_time = [
cftime.datetime(
dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, calendar=calendar
)
for dt in tmcoord.values
]
return da.assign_coords({tmcoord.name: new_time})
[docs]
def configure_chunking_and_compression(
ds: xr.Dataset, complevel: Optional[int] = None
) -> None:
"""Configure chunking and compression for all DataArrays in a Dataset.
This is done by manipulating the `encoding` attribute of each DataArray in-place.
This attribute controls how the data will be written to a NetCDF file.
Data variables are compressed if `complevel` is non-zero, otherwise decompressed.
Compressed variables have chunk size 1 along the time dimension,
and chunk sizes equal to the full dimension size along other dimensions.
Coordinate variables are never compressed.
Args:
ds: Dataset to configure
complevel: Compression level (0-9) for data variables, or None to use
the original compression level (if any). If 0, no compression
is applied.
"""
def _configure(da: xr.DataArray, complevel: Optional[int]):
if complevel is not None:
da.encoding["complevel"] = complevel
if da.encoding.get("complevel", 0) == 0:
# Decompress (no chunking at all)
da.encoding.pop("chunksizes", None)
else:
# Set chunk size for time dimension to 1 to allow
# efficient access to values @ single time points
da.encoding["chunksizes"] = list(da.shape)
da.encoding["chunksizes"][0] = 1
da.encoding["shuffle"] = True
da.encoding["zlib"] = True
da.encoding.pop("contiguous", None)
for da in ds.coords.values():
_configure(da, 0)
for da in ds.data_vars.values():
_configure(da, complevel)