Source code for pygetm.input.gotm

import argparse
import datetime
from typing import Iterable, Mapping, Union
import os
from pathlib import Path

import cftime
import numpy as np
import xarray as xr

METEO_COLUMNS = ("u10", "v10", "sp", "t2m", "hum", "tcc")
METEO_LONG_NAMES = dict(
    u10="eastward wind speed",
    v10="northward wind speed",
    sp="surface pressure",
    t2m="air temperature @ 2 m",
    hum="humidity @ 2 m",
    tcc="total cloud cover",
)
METEO_UNITS = dict(u10="m s-1", v10="m s-1", tcc="1")


[docs] def get_timeseries( fn: Union[str, os.PathLike], columns: Union[Mapping[int, str], Iterable[str]], units: Mapping[str, str] = {}, long_names: Mapping[str, str] = {}, ) -> xr.Dataset: dts = [] all_values = [] ncol = None if not isinstance(columns, Mapping): # Names for every column in the file are provided in order. # Convert to a mapping from column index to name. columns = dict(enumerate(columns)) ncol = max(columns) + 1 path = Path(fn) with path.open() as f: last_dt = None for line in f: line = line.rstrip() if not line or line[0] in "#!": continue dt = datetime.datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S") if last_dt is not None and last_dt > dt: raise ValueError("Timestamps are not in chronological order") last_dt = dt items = line[20:].split() assert ncol is None or len(items) == ncol dts.append( cftime.datetime( dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second ) ) all_values.append(list(map(float, items))) all_values = np.asarray(all_values) timevar = xr.DataArray(np.array(dts), dims=("time",)) name2var = dict(time=timevar) for icol, name in columns.items(): values = all_values[:, icol] attrs = {} if name in units: attrs["units"] = units[name] if name in long_names: attrs["long_name"] = long_names[name] ar = xr.DataArray( values, coords={"time": timevar}, dims=("time",), name=name, attrs=attrs ) ar.encoding["source"] = str(path) name2var[name] = ar return xr.Dataset(name2var)
[docs] def get_profiles( fn: Union[str, os.PathLike], columns: Union[Mapping[int, str], Iterable[str]], units: Mapping[str, str] = {}, long_names: Mapping[str, str] = {}, ) -> xr.Dataset: dts = [] all_values = [] z = None ncol = None if not isinstance(columns, Mapping): # Names for every column in the file are provided in order. # Convert to a mapping from column index to name. columns = dict(enumerate(columns)) ncol = max(columns) + 1 path = Path(fn) with path.open() as f: last_dt = None while 1: line = f.readline() if not line: break line = line.rstrip() if not line or line[0] in "#!": continue dt = datetime.datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S") if last_dt is not None and last_dt > dt: raise ValueError("Timestamps are not in chronological order") last_dt = dt items = line[20:].split() n = int(items[0]) up = int(items[1]) == 1 values = [] current_z = [] for i in range(n): while True: line = f.readline() if not line: raise ValueError("Unexpected end of file") line = line.rstrip() if line and line[0] not in "#!": break items = list(map(float, line.split())) current_z.append(items.pop(0)) assert ncol is None or len(items) == ncol values.append(items) current_z = np.asarray(current_z) if z is None: z = current_z else: assert (current_z == z).all() dts.append( cftime.datetime( dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second ) ) all_values.append(values) all_values = np.asarray(all_values) timevar = xr.DataArray(np.array(dts), dims=("time",)) zvar = xr.DataArray(z, dims=("z",), attrs={"positive": "up", "units": "m"}) name2var = dict(time=timevar) for icol, name in columns.items(): values = all_values[..., icol] attrs = {} if name in units: attrs["units"] = units[name] if name in long_names: attrs["long_name"] = long_names[name] ar = xr.DataArray( values, coords={"time": timevar, "z": zvar}, dims=("time", "z"), name=name, attrs=attrs, ) ar.encoding["source"] = str(path) name2var[name] = ar return xr.Dataset(name2var)
[docs] def get_meteo(fn: Union[str, os.PathLike]) -> xr.Dataset: return get_timeseries(fn, METEO_COLUMNS, METEO_UNITS, METEO_LONG_NAMES)
if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("file", type=Path, help="Path to GOTM meteo file") args = parser.parse_args() ds = get_meteo(args.file) print(ds)