Source code for tsdat.utils

import numpy as np
import pandas as pd
import xarray as xr
from typing import Any, List, Optional, Tuple
from pydantic import BaseModel, Extra
from numpy.typing import NDArray

__all__ = [
    "ParameterizedClass",
    "decode_cf",
    "record_corrections_applied",
    "assign_data",
    "get_start_time",
    "get_start_date_and_time_str",
    "get_filename",
]


[docs]class ParameterizedClass(BaseModel, extra=Extra.forbid): """------------------------------------------------------------------------------------ Base class for any class that accepts 'parameters' as an argument. Sets the default 'parameters' to {}. Subclasses of ParameterizedClass should override the 'parameters' properties to support custom required or optional arguments from configuration files. ------------------------------------------------------------------------------------"""
[docs] parameters: Any = {}
[docs]def decode_cf(dataset: xr.Dataset) -> xr.Dataset: """--------------------------------------------------------------------------------- Wrapper around `xarray.decode_cf()` which handles additional edge cases. This helps ensure that the dataset is formatted and encoded correctly after it has been constructed or modified. Handles edge cases for units and data type encodings on datetime variables. Args: dataset (xr.Dataset): The dataset to decode. Returns: xr.Dataset: The decoded dataset. ---------------------------------------------------------------------------------""" # We have to make sure that time variables do not have units set as attrs, and # instead have units set on the encoding or else xarray will crash when trying # to save: https://github.com/pydata/xarray/issues/3739 for variable in dataset.variables.values(): if ( np.issubdtype(variable.data.dtype, np.datetime64) # type: ignore and "units" in variable.attrs ): units = variable.attrs["units"] del variable.attrs["units"] variable.encoding["units"] = units # type: ignore # Leaving the "dtype" entry in the encoding for datetime64 variables causes a crash # when saving the dataset. Not fixed by: https://github.com/pydata/xarray/pull/4684 ds: xr.Dataset = xr.decode_cf(dataset) # type: ignore for variable in ds.variables.values(): if variable.data.dtype.type == np.datetime64: # type: ignore if "dtype" in variable.encoding: # type: ignore del variable.encoding["dtype"] # type: ignore return ds
[docs]def record_corrections_applied( dataset: xr.Dataset, variable_name: str, message: str ) -> None: """--------------------------------------------------------------------------------- Records the message on the 'corrections_applied' attribute. Args: dataset (xr.Dataset): The corrected dataset. variable_name (str): The name of the variable in the dataset. message (str): The message to record. ---------------------------------------------------------------------------------""" variable_attrs = dataset[variable_name].attrs corrections: List[str] = variable_attrs.get("corrections_applied", []) corrections.append(message) variable_attrs["corrections_applied"] = corrections
[docs]def assign_data( dataset: xr.Dataset, data: NDArray[Any], variable_name: str ) -> xr.Dataset: """--------------------------------------------------------------------------------- Assigns the data to the specified variable in the dataset. If the variable exists and it is a data variable, then the DataArray for the specified variable in the dataset will simply have its data replaced with the new numpy array. If the variable exists and it is a coordinate variable, then the data will replace the coordinate data. If the variable does not exist in the dataset then a KeyError will be raised. Args: dataset (xr.Dataset): The dataset where the data should be assigned. data (NDArray[Any]): The data to assign. variable_name (str): The name of the variable in the dataset to assign data to. Raises: KeyError: Raises a KeyError if the specified variable is not in the dataset's coords or data_vars dictionary. Returns: xr.Dataset: The dataset with data assigned to it. ---------------------------------------------------------------------------------""" if variable_name in dataset.data_vars: dataset[variable_name].data = data elif variable_name in dataset.coords: tmp_name = f"__{variable_name}__" dataset = dataset.rename_vars({variable_name: tmp_name}) # TODO: ensure attrs are copied over too dataset[variable_name] = xr.zeros_like(dataset[tmp_name], dtype=data.dtype) # type: ignore dataset[variable_name].data[:] = data[:] # dataset = dataset.swap_dims({tmp_name: variable_name}) # type: ignore dataset = dataset.drop_vars(tmp_name) # dataset = dataset.rename_dims( # {tmp_name: variable_name} # ) # FIXME: This might drop all dimensions other than the one that was just renamed else: raise KeyError( f"'{variable_name}' must be a coord or a data_var in the dataset to assign" " data to it." ) return dataset
[docs]def get_start_time(dataset: xr.Dataset) -> pd.Timestamp: """--------------------------------------------------------------------------------- Gets the earliest 'time' value and returns it as a pandas Timestamp. Args: dataset (xr.Dataset): The dataset whose start time should be retrieved. Returns: pd.Timestamp: The timestamp of the earliest time value in the dataset. ---------------------------------------------------------------------------------""" time64: np.datetime64 = np.min(dataset["time"].data) # type: ignore datetime: pd.Timestamp = pd.to_datetime(time64) # type: ignore return datetime
[docs]def get_start_date_and_time_str(dataset: xr.Dataset) -> Tuple[str, str]: """--------------------------------------------------------------------------------- Gets the start date and start time strings from a Dataset. The strings are formatted using strftime and the following formats: - date: "%Y%m%d" - time: ""%H%M%S" Args: dataset (xr.Dataset): The dataset whose start date and time should be retrieved. Returns: Tuple[str, str]: The start date and time as strings like "YYYYmmdd", "HHMMSS". ---------------------------------------------------------------------------------""" timestamp = get_start_time(dataset) return timestamp.strftime("%Y%m%d"), timestamp.strftime("%H%M%S")
[docs]def get_filename( dataset: xr.Dataset, extension: str, title: Optional[str] = None ) -> str: """--------------------------------------------------------------------------------- Returns the standardized filename for the provided dataset. Returns a key consisting of the dataset's datastream, starting date/time, the extension, and an optional title. For file-based storage systems this method may be used to generate the basename of the output data file by providing extension as '.nc', '.csv', or some other file ending type. For ancillary plot files this can be used in the same way by specifying extension as '.png', '.jpeg', etc and by specifying the title, resulting in files named like '<datastream>.20220424.165314.plot_title.png'. Args: dataset (xr.Dataset): The dataset (used to extract the datastream and starting / ending times). extension (str): The file extension that should be used. title (Optional[str]): An optional title that will be placed between the start time and the extension in the generated filename. Returns: str: The filename constructed from provided parameters. ---------------------------------------------------------------------------------""" datastream_name = dataset.attrs["datastream"] start_date, start_time = get_start_date_and_time_str(dataset) title = "" if title is None else f".{title}" extension = extension if extension.startswith(".") else f".{extension}" return f"{datastream_name}.{start_date}.{start_time}{title}{extension}"
# def get_raw_filename( # raw_data: Union[xr.Dataset, Dict[str, xr.Dataset]], old_filename: str, config # ) -> str: # """Returns the appropriate raw filename of the raw dataset according to # MHKIT-Cloud naming conventions. Uses the config object to parse the # start date and time from the raw dataset for use in the new filename. # The new filename will follow the MHKIT-Cloud Data standards for raw # filenames, ie: `datastream_name.date.time.raw.old_filename`, where the # data level used in the datastream_name is `00`. # :param raw_data: The raw data as an xarray dataset or a dictionary # of form {str: xr.Dataset}. # :type raw_data: Union[xr.Dataset, Dict[str, xr.Dataset]] # :param old_filename: The name of the original raw file. # :type old_filename: str # :param config: The Config object used to assist reading time data from # the raw_dataset. # :type config: Config # :return: The standardized filename of the raw file. # :rtype: str # """ # original_filename = os.path.basename(old_filename) # raw_datastream_name = config.pipeline_definition.input_datastream_name # time_var = config.dataset_definition.get_variable("time") # if isinstance(raw_data, xr.Dataset): # raw_data = {old_filename: raw_data} # date_times: List[str] = [] # for filename, ds in raw_data.items(): # try: # start_date, start_time = DSUtil.get_raw_start_time(ds, time_var) # date_time = f"{start_date}.{start_time}" # date_times.append(date_time) # except KeyError: # warnings.warn( # f"Could not retrieve `time` from {filename} with retrieval name {time_var}" # ) # assert len(date_times), f"Failed to create new filename from {old_filename}" # start_date_time = min(date_times) # return f"{raw_datastream_name}.{start_date_time}.raw.{original_filename}" # def plot_qc( # ds: xr.Dataset, variable_name: str, filename: str = None, **kwargs # ) -> act.plotting.TimeSeriesDisplay: # """Create a QC plot for the given variable. This is based on the ACT library: # https://arm-doe.github.io/ACT/source/auto_examples/plot_qc.html#sphx-glr-source-auto-examples-plot-qc-py # We provide a convenience wrapper method for basic QC plots of a variable, but # we recommend to use ACT directly and look at their examples for more complex plots # like plotting variables in two different datasets. # TODO: Depending on use cases, we will likely add more arguments to be able to quickly produce # the most common types of QC plots. # :param ds: A dataset # :type ds: xr.Dataset # :param variable_name: The variable to plot # :type variable_name: str # :param filename: The filename for the image. Saves the plot as this filename if provided. # :type filename: str, optional # """ # datastream_name = DSUtil.get_datastream_name(ds=ds) # display = act.plotting.TimeSeriesDisplay( # ds, subplot_shape=(2,), ds_name=datastream_name, **kwargs # ) # # Plot temperature data in top plot # display.plot(variable_name, subplot_index=(0,)) # # Plot QC data # display.qc_flag_block_plot(variable_name, subplot_index=(1,)) # if filename: # plt.savefig(filename) # return display # def get_raw_start_time( # raw_ds: xr.Dataset, time_var_definition: "tsdat.config.VariableDefinition" # ) -> Tuple[str, str]: # """Convenience method to get the start date and time from a raw xarray # dataset. This uses `time_var_definition.get_input_name()` as the # dataset key for the time variable and additionally uses the input's # `Converter` object if applicable. # :param raw_ds: A raw dataset (not standardized) # :type raw_ds: xr.Dataset # :param time_var_definition: The 'time' variable definition from the # pipeline config # :type time_var_definition: VariableDefinition # :return: A tuple of strings representing the first time data point # in the dataset. The first string is the day in 'yyyymmdd' format. # The second string is the time in 'hhmmss' format. # :rtype: Tuple[str, str] # """ # time_var_name = time_var_definition.get_input_name() # time_data = raw_ds[time_var_name].data # time64_data = time_var_definition.run_converter(time_data) # start_datetime64 = np.min(time64_data) # return DSUtil.datetime64_to_string(start_datetime64) # def datetime64_to_timestamp(variable_data: NDArray[Any]) -> NDArray[np.int64]: # """Converts each datetime64 value to a timestamp in same units as # the variable (eg., seconds, nanoseconds). # :param variable_data: ndarray of variable data # :type variable_data: np.ndarray # :return: An ndarray of the same shape, with time values converted to # long timestamps (e.g., int64) # :rtype: np.ndarray # """ # return variable_data.astype(pd.Timestamp).astype(np.int64) # type: ignore # def datetime64_to_string(datetime64: np.datetime64) -> Tuple[str, str]: # """Convert a datetime64 object to formated string. # :param datetime64: The datetime64 object # :type datetime64: Union[np.ndarray, np.datetime64] # :return: A tuple of strings representing the formatted date. The first string is # the day in 'yyyymmdd' format. The second string is the time in 'hhmmss' format. # :rtype: Tuple[str, str] # """ # datetime_: datetime.datetime = act.utils.datetime64_to_datetime(datetime64)[0] # type: ignore # return datetime_.strftime("%Y%m%d"), datetime_.strftime("%H%M%S") # IDEA: Method to print a summary of the list of problems with the data