tsdat.io.base

Classes

DataConverter

Base class for running data conversions on retrieved raw data.

DataHandler

Groups a DataReader subclass and a DataWriter subclass together.

DataReader

Base class for reading data from an input source.

DataWriter

Base class for writing data to storage area(s).

FileHandler

DataHandler specifically tailored to reading and writing files of a specific type.

FileWriter

Base class for file-based DataWriters.

RetrievalRuleSelections

Maps variable names to the rules and conversions that should be applied.

RetrievedDataset

Maps variable names to the input DataArray the data are retrieved from.

Retriever

Base class for retrieving data used as input to tsdat pipelines.

Storage

Abstract base class for the tsdat Storage API. Subclasses of Storage are used in

class tsdat.io.base.DataConverter[source]

Bases: tsdat.utils.ParameterizedClass, abc.ABC

Base class for running data conversions on retrieved raw data.

Class Methods

convert

Runs the data converter on the retrieved data.

Method Descriptions

abstract convert(self, data: xarray.DataArray, variable_name: str, dataset_config: tsdat.config.dataset.DatasetConfig, retrieved_dataset: RetrievedDataset, **kwargs: Any) Optional[xarray.DataArray][source]

Runs the data converter on the retrieved data.

Parameters
  • data (xr.DataArray) – The retrieved DataArray to convert.

  • retrieved_dataset (RetrievedDataset) – The retrieved dataset containing data to convert.

  • dataset_config (DatasetConfig) – The output dataset configuration.

  • variable_name (str) – The name of the variable to convert.

Returns

Optional[xr.DataArray]

The converted DataArray for the specified variable,

or None if the conversion was done in-place.

class tsdat.io.base.DataHandler[source]

Bases: tsdat.utils.ParameterizedClass

Groups a DataReader subclass and a DataWriter subclass together.

This provides a unified approach to data I/O. DataHandlers are typically expected to be able to round-trip the data, i.e. the following psuedocode is generally true:

handler.read(handler.write(dataset))) == dataset

Parameters
  • reader (DataReader) – The DataReader subclass responsible for reading input data.

  • writer (FileWriter) – The FileWriter subclass responsible for writing output

  • data.

parameters :Dict[str, Any][source]
reader :DataReader[source]
writer :DataWriter[source]

Class Methods

patch_parameters

Method Descriptions

patch_parameters(cls, v: DataReader, values: Dict[str, Any], field: pydantic.fields.ModelField)[source]
class tsdat.io.base.DataReader[source]

Bases: tsdat.utils.ParameterizedClass, abc.ABC

Base class for reading data from an input source.

Parameters
  • regex (Pattern[str]) – The regex pattern associated with the DataReader. If

  • pipeline (calling the DataReader from a tsdat) –

  • checked (this pattern will be) –

  • called. (against each possible input key before the read() method is) –

Class Methods

read

Reads data given an input key.

Method Descriptions

abstract read(self, input_key: str) Union[xarray.Dataset, Dict[str, xarray.Dataset]][source]

Reads data given an input key.

Uses the input key to open a resource and load data as a xr.Dataset object or as a mapping of strings to xr.Dataset objects.

In most cases DataReaders will only need to return a single xr.Dataset, but occasionally some types of inputs necessitate that the data loaded from the input_key be returned as a mapping. For example, if the input_key is a path to a zip file containing multiple disparate datasets, then returning a mapping is appropriate.

Parameters

input_key (str) – An input key matching the DataReader’s regex pattern that should be used to load data.

Returns

Union[xr.Dataset, Dict[str, xr.Dataset]]

The raw data extracted from the

provided input key.

class tsdat.io.base.DataWriter[source]

Bases: tsdat.utils.ParameterizedClass, abc.ABC

Base class for writing data to storage area(s).

Class Methods

write

Writes the dataset to the storage area.

Method Descriptions

abstract write(self, dataset: xarray.Dataset, **kwargs: Any) None[source]

Writes the dataset to the storage area.

This method is typically called by the tsdat storage API, which will be responsible for providing any additional parameters required by subclasses of the tsdat.io.base.DataWriter class.

Parameters

dataset (xr.Dataset) – The dataset to save.

class tsdat.io.base.FileHandler[source]

Bases: DataHandler

DataHandler specifically tailored to reading and writing files of a specific type.

Parameters
  • extension (str) – The specific file extension used for data files, e.g., “.nc”.

  • reader (DataReader) – The DataReader subclass responsible for reading input data.

  • writer (FileWriter) – The FileWriter subclass responsible for writing output

  • data.

extension :str[source]
reader :DataReader[source]
writer :FileWriter[source]

Class Methods

no_leading_dot

Method Descriptions

no_leading_dot(cls, v: str, values: Dict[str, Any]) str[source]
class tsdat.io.base.FileWriter[source]

Bases: DataWriter, abc.ABC

Base class for file-based DataWriters.

Parameters

file_extension (str) – The file extension that the FileHandler should be used for, e.g., “.nc”, “.csv”, …

file_extension :str[source]

Class Methods

no_leading_dot

write

Writes the dataset to the provided filepath.

Method Descriptions

classmethod no_leading_dot(cls, v: str) str[source]
abstract write(self, dataset: xarray.Dataset, filepath: Optional[pathlib.Path] = None, **kwargs: Any) None[source]

Writes the dataset to the provided filepath.

This method is typically called by the tsdat storage API, which will be responsible for providing the filepath, including the file extension.

Parameters
  • dataset (xr.Dataset) – The dataset to save.

  • filepath (Optional[Path]) – The path to the file to save.

class tsdat.io.base.RetrievalRuleSelections[source]

Bases: NamedTuple

Maps variable names to the rules and conversions that should be applied.

coords :Dict[VarName, RetrievedVariable][source]
data_vars :Dict[VarName, RetrievedVariable][source]
class tsdat.io.base.RetrievedDataset[source]

Bases: NamedTuple

Maps variable names to the input DataArray the data are retrieved from.

coords :Dict[VarName, xarray.DataArray][source]
data_vars :Dict[VarName, xarray.DataArray][source]

Class Methods

from_xr_dataset

Method Descriptions

classmethod from_xr_dataset(cls, dataset: xarray.Dataset)[source]
class tsdat.io.base.Retriever[source]

Bases: tsdat.utils.ParameterizedClass, abc.ABC

Base class for retrieving data used as input to tsdat pipelines.

Parameters

readers (Dict[str, DataReader]) – The mapping of readers that should be used to retrieve data given input_keys and optional keyword arguments provided by subclasses of Retriever.

coords :Dict[str, Dict[Pattern, RetrievedVariable]][source]

A dictionary mapping output coordinate names to the retrieval rules and preprocessing actions (e.g., DataConverters) that should be applied to each retrieved coordinate variable.

data_vars :Dict[str, Dict[Pattern, RetrievedVariable]][source]

A dictionary mapping output data variable names to the retrieval rules and preprocessing actions (e.g., DataConverters) that should be applied to each retrieved data variable.

readers :Optional[Dict[Pattern, Any]][source]

Mapping of readers that should be used to read data given input keys.

Class Methods

retrieve

Prepares the raw dataset mapping for use in downstream pipeline processes.

Method Descriptions

abstract retrieve(self, input_keys: List[str], dataset_config: tsdat.config.dataset.DatasetConfig, **kwargs: Any) xarray.Dataset[source]

Prepares the raw dataset mapping for use in downstream pipeline processes.

This is done by consolidating the data into a single xr.Dataset object. The retrieved dataset may contain additional coords and data_vars that are not defined in the output dataset. Input data converters are applied as part of the preparation process.

Parameters
  • input_keys (List[str]) – The input keys the registered DataReaders should read from.

  • dataset_config (DatasetConfig) – The specification of the output dataset.

Returns

xr.Dataset – The retrieved dataset.

class tsdat.io.base.Storage[source]

Bases: tsdat.utils.ParameterizedClass, abc.ABC

Abstract base class for the tsdat Storage API. Subclasses of Storage are used in pipelines to persist data and ancillary files (e.g., plots).

Parameters
  • parameters (Any) – Configuration parameters for the Storage API. The specific parameters that are allowed will be defined by subclasses of this base class.

  • handler (DataHandler) – The DataHandler responsible for handling both read and write operations needed by the storage API.

class Parameters[source]

Bases: pydantic.BaseSettings

ancillary_filename_template :str = {datastream}.{date_time}.{title}.{extension}[source]

Template string to use for ancillary filenames.

Allows substitution of the following parameters using curly braces ‘{}’:

  • title: a provided label for the ancillary file or plot.

  • extension: the file extension (e.g., ‘png’, ‘gif’).

  • datastream from the related xr.Dataset object’s global attributes.

  • location_id from the related xr.Dataset object’s global attributes.

  • data_level from the related xr.Dataset object’s global attributes.

  • year, month, day, hour, minute, second of the first timestamp in the data.

  • date_time: the first timestamp in the file formatted as “YYYYMMDD.hhmmss”.

  • The names of any other global attributes of the related xr.Dataset object.

At a minimum the template must include {date_time}.

ancillary_storage_path :str = ancillary/{location_id}/{datastream}[source]

The directory structure under storage_root where ancillary files are saved.

Allows substitution of the following parameters using curly braces ‘{}’:

  • extension: the file extension (e.g., ‘png’, ‘gif’).

  • datastream from the related xr.Dataset object’s global attributes.

  • location_id from the related xr.Dataset object’s global attributes.

  • data_level from the related xr.Dataset object’s global attributes.

  • year, month, day, hour, minute, second of the first timestamp in the data.

  • date_time: the first timestamp in the file formatted as “YYYYMMDD.hhmmss”.

  • The names of any other global attributes of the related xr.Dataset object.

Defaults to ancillary/{location_id}/{datastream}.

storage_root :pathlib.Path[source]

The path on disk where at least ancillary files will be saved to. For file-based storage classes this is also the root path for data files. Defaults to the storage/root folder in the active working directory.

NOTE: This parameter can also be set via the TSDAT_STORAGE_ROOT environment variable.

handler :DataHandler[source]

Defines methods for reading and writing datasets from the storage area.

parameters :Storage.Parameters[source]

Parameters used by the storage API that can be set through configuration files, environment variables, or directly.

Class Methods

fetch_data

Fetches a dataset from the storage area.

get_ancillary_filepath

Returns the filepath for the given datastream and title of an ancillary file

last_modified

Find the last modified time for any data in that datastream.

modified_since

Find the list of data dates that have been modified since the passed

save_ancillary_file

Saves an ancillary filepath to the datastream's ancillary storage area.

save_data

Saves the dataset to the storage area.

uploadable_dir

Context manager that can be used to upload many ancillary files at once.

Method Descriptions

abstract fetch_data(self, start: datetime.datetime, end: datetime.datetime, datastream: str, metadata_kwargs: dict[str, str] | None = None, **kwargs: Any) xarray.Dataset[source]

Fetches a dataset from the storage area.

The timespan of the returned dataset is between the specified start and end times.

Parameters
  • start (datetime) – The start time bound.

  • end (datetime) – The end time bound.

  • datastream (str) – The name of the datastream to fetch.

  • metadata_kwargs (dict[str, str], optional) – Metadata substitutions to help resolve the data storage path. This is only required if the template data storage path includes any properties other than datastream or fields contained in the datastream. Defaults to None.

Returns

xr.Dataset – The fetched dataset.

get_ancillary_filepath(self, title: str, extension: str = 'png', dataset: xr.Dataset | None = None, datastream: str | None = None, start: datetime | None = None, root_dir: Path | None = None, mkdirs: bool = True, **kwargs: str) pathlib.Path[source]

Returns the filepath for the given datastream and title of an ancillary file to be created.

This method is typically used in the plotting hook of pipelines to get the path to where the plot file should be saved. In this case, it is recommend to use this in conjunction with with self.storage.uploadable_dir() as tmp_dir and use root_dir=tmp_dir as an argument to this function.

Example:

# in ``hook_plot_dataset(self, dataset: xr.Dataset)``
with self.storage.uploadable_dir() as tmp_dir:
    fig, ax = plt.subplots()

    # plotting code ...

    plot_file = self.storage.get_ancillary_filepath(
        title="wind_speed",
        extension="png",
        root_dir=tmp_dir,
        dataset=dataset,
    )
    fig.savefig(plot_file)
    plt.close(fig)
Parameters
  • title (str) – The title of the ancillary file or plot. Should be lowercase and use _ instead of spaces.

  • extension (str) – The file extension to be used. Defaults to “png”.

  • dataset (xr.Dataset | None, optional) – The dataset relating to the ancillary file. If provided, this is used to populate defaults for the datastream, start datetime, and other substitutions used to fill out the storage path template. Values from these other fields, if present, will take precedence.

  • datastream (str | None, optional) – The datastream relating to the ancillary file to be saved. Defaults to dataset.attrs["datastream"].

  • start (datetime | None, optional) – The datetime relating to the ancillary file to be saved. Defaults to dataset.time[0].

  • root_dir (Path | None, optional) – The root directory. If using a temporary (uploadable) directory, it is recommended to use that as the root_dir. Defaults to None.

  • mkdirs (bool, optional) – True if directories should be created, False otherwise. Defaults to True.

  • **kwargs (str) – Extra kwargs to use as substitutions for the ancillary storage path or filename templates, which may require more parameters than those already specified as arguments here. Defaults to **dataset.attrs.

Returns

Path – The path to the ancillary file.

last_modified(self, datastream: str) Union[datetime.datetime, None][source]

Find the last modified time for any data in that datastream.

Parameters

datastream (str) – The datastream.

Returns

datetime – The datetime of the last modification.

modified_since(self, datastream: str, last_modified: datetime.datetime) List[datetime.datetime][source]

Find the list of data dates that have been modified since the passed last modified date.

Parameters
  • datastream (str) – _description_

  • last_modified (datetime) – Should be equivalent to run date (the last time data were changed)

Returns

List[datetime]

The data dates of files that were changed since the last

modified date

abstract save_ancillary_file(self, filepath: pathlib.Path, target_path: Path | None = None)[source]

Saves an ancillary filepath to the datastream’s ancillary storage area.

NOTE: In most cases this function should not be used directly. Instead, prefer using the self.uploadable_dir(*args, **kwargs) method.

Parameters
  • filepath (Path) – The path to the ancillary file. This is expected to have a standardized filename and should be saved under the ancillary storage path.

  • target_path (str) – The path to where the data should be saved.

abstract save_data(self, dataset: xarray.Dataset, **kwargs: Any)[source]

Saves the dataset to the storage area.

Parameters

dataset (xr.Dataset) – The dataset to save.

uploadable_dir(self, **kwargs: Any) Generator[pathlib.Path, None, None][source]

Context manager that can be used to upload many ancillary files at once.

This method yields the path to a temporary directory whose contents will be saved to the storage area using the save_ancillary_file method upon exiting the context manager.

Example:

# in ``hook_plot_dataset(self, dataset: xr.Dataset)``
with self.storage.uploadable_dir() as tmp_dir:
    fig, ax = plt.subplots()

    # plotting code ...

    plot_file = self.storage.get_ancillary_filepath(
        title="wind_speed",
        extension="png",
        root_dir=tmp_dir,
        dataset=dataset,
    )
    fig.savefig(plot_file)
    plt.close(fig)
Parameters

kwargs (Any) – Unused. Included for backwards compatibility.

Yields

Path – A temporary directory where files can be saved.