Skip to content

pipelines

Classes#

IngestPipeline #

Bases: Pipeline


Pipeline class designed to read in raw, unstandardized time series data and enhance its quality and usability by converting it into a standard format, embedding metadata, applying quality checks and controls, generating reference plots, and saving the data in an accessible format so it can be used later in scientific analyses or in higher-level tsdat Pipelines.


Attributes#

ds property #
ds: Optional[Dataset]
tmp_dir property #
tmp_dir: Optional[Path]

Functions#

get_ancillary_filepath #
get_ancillary_filepath(
    title: str, extension: str = "png", **kwargs: Any
) -> Path

Returns the path to where an ancillary file should be saved so that it can be synced to the storage area automatically.

Parameters:

Name Type Description Default
title str

The title to use for the plot filename. Should only contain alphanumeric and '_' characters.

required
extension str

The file extension. Defaults to "png".

'png'

Returns:

Name Type Description
Path Path

The ancillary filepath.

Source code in tsdat/pipeline/pipelines.py
def get_ancillary_filepath(
    self, title: str, extension: str = "png", **kwargs: Any
) -> Path:
    """Returns the path to where an ancillary file should be saved so that it can be
    synced to the storage area automatically.

    Args:
        title (str): The title to use for the plot filename. Should only contain
            alphanumeric and '_' characters.
        extension (str, optional): The file extension. Defaults to "png".

    Returns:
        Path: The ancillary filepath.
    """
    dataset = kwargs.pop("dataset", self.ds)
    root_dir = kwargs.pop("root_dir", self.tmp_dir)
    return self.storage.get_ancillary_filepath(
        title=title,
        extension=extension,
        dataset=dataset,
        root_dir=root_dir,
        **kwargs,
    )
hook_customize_dataset #
hook_customize_dataset(dataset: xr.Dataset) -> xr.Dataset

Code hook to customize the retrieved dataset prior to qc being applied.

Parameters:

Name Type Description Default
dataset Dataset

The output dataset structure returned by the retriever API.

required

Returns:

Type Description
Dataset

xr.Dataset: The customized dataset.


Source code in tsdat/pipeline/pipelines.py
def hook_customize_dataset(self, dataset: xr.Dataset) -> xr.Dataset:
    """-----------------------------------------------------------------------------
    Code hook to customize the retrieved dataset prior to qc being applied.

    Args:
        dataset (xr.Dataset): The output dataset structure returned by the retriever
            API.

    Returns:
        xr.Dataset: The customized dataset.

    -----------------------------------------------------------------------------"""
    return dataset
hook_finalize_dataset #
hook_finalize_dataset(dataset: xr.Dataset) -> xr.Dataset

Code hook to finalize the dataset after qc is applied but before it is saved.

Parameters:

Name Type Description Default
dataset Dataset

The output dataset returned by the retriever API and modified by the hook_customize_dataset user code hook.

required

Returns:

Type Description
Dataset

xr.Dataset: The finalized dataset, ready to be saved.


Source code in tsdat/pipeline/pipelines.py
def hook_finalize_dataset(self, dataset: xr.Dataset) -> xr.Dataset:
    """-----------------------------------------------------------------------------
    Code hook to finalize the dataset after qc is applied but before it is saved.

    Args:
        dataset (xr.Dataset): The output dataset returned by the retriever API and
            modified by the `hook_customize_dataset` user code hook.

    Returns:
        xr.Dataset: The finalized dataset, ready to be saved.

    -----------------------------------------------------------------------------"""
    return dataset
hook_plot_dataset #
hook_plot_dataset(dataset: xr.Dataset)

Code hook to create plots for the data which runs after the dataset has been saved.

Parameters:

Name Type Description Default
dataset Dataset

The dataset to plot.

required

Source code in tsdat/pipeline/pipelines.py
def hook_plot_dataset(self, dataset: xr.Dataset):
    """-----------------------------------------------------------------------------
    Code hook to create plots for the data which runs after the dataset has been saved.

    Args:
        dataset (xr.Dataset): The dataset to plot.

    -----------------------------------------------------------------------------"""
    pass
run #
run(inputs: List[str], **kwargs: Any) -> xr.Dataset
Source code in tsdat/pipeline/pipelines.py
def run(self, inputs: List[str], **kwargs: Any) -> xr.Dataset:
    dataset = self.retriever.retrieve(inputs, self.dataset_config)
    dataset = self.prepare_retrieved_dataset(dataset)
    add_inputs_attr(dataset, inputs)
    dataset = self.hook_customize_dataset(dataset)
    dataset = self.quality.manage(dataset)
    dataset = self.hook_finalize_dataset(dataset)
    # HACK: Fix encoding on datetime64 variables. Use a shallow copy to retain units
    # on datetime64 variables in the pipeline (but remove with decode_cf())
    dataset = decode_cf(dataset)
    self.storage.save_data(dataset)
    with self.storage.uploadable_dir() as tmp_dir:
        self._ds = dataset
        self._tmp_dir = tmp_dir
        self.hook_plot_dataset(dataset)
    return dataset

TransformationPipeline #

Bases: IngestPipeline


Pipeline class designed to read in standardized time series data and enhance its quality and usability by combining multiple sources of data, using higher-level processing techniques, etc.


Attributes#

parameters instance-attribute #
parameters: Parameters
retriever instance-attribute #
retriever: StorageRetriever

Classes#

Parameters #

Bases: BaseModel

Attributes#
datastreams instance-attribute #
datastreams: List[str]

A list of datastreams that the pipeline should be configured to run for. Datastreams should include the location and data level information.

Functions#

hook_customize_input_datasets #
hook_customize_input_datasets(
    input_datasets: Dict[str, xr.Dataset], **kwargs: Any
) -> Dict[str, xr.Dataset]

Code hook to customize any input datasets prior to datastreams being combined and data converters being run.

Parameters:

Name Type Description Default
input_datasets Dict[str, Dataset]

The dictionary of input key (str) to input dataset. Note that for transformation pipelines, input keys != input filename, rather each input key is a combination of the datastream and date range used to pull the input data from the storage retriever.

required

Returns:

Type Description
Dict[str, Dataset]

Dict[str, xr.Dataset]: The customized input datasets.


Source code in tsdat/pipeline/pipelines.py
def hook_customize_input_datasets(
    self, input_datasets: Dict[str, xr.Dataset], **kwargs: Any
) -> Dict[str, xr.Dataset]:
    """-----------------------------------------------------------------------------
    Code hook to customize any input datasets prior to datastreams being combined
    and data converters being run.

    Args:
        input_datasets (Dict[str, xr.Dataset]): The dictionary of input key (str) to
            input dataset. Note that for transformation pipelines, input keys !=
            input filename, rather each input key is a combination of the datastream
            and date range used to pull the input data from the storage retriever.

    Returns:
        Dict[str, xr.Dataset]: The customized input datasets.

    -----------------------------------------------------------------------------"""
    return input_datasets
run #
run(inputs: List[str], **kwargs: Any) -> xr.Dataset

Runs the data pipeline on the provided inputs.

Parameters:

Name Type Description Default
inputs List[str]

A 2-element list of start-date, end-date that the pipeline should process.

required

Returns:

Type Description
Dataset

xr.Dataset: The processed dataset.


Source code in tsdat/pipeline/pipelines.py
def run(self, inputs: List[str], **kwargs: Any) -> xr.Dataset:
    """-----------------------------------------------------------------------------
    Runs the data pipeline on the provided inputs.

    Args:
        inputs (List[str]): A 2-element list of start-date, end-date that the
            pipeline should process.

    Returns:
        xr.Dataset: The processed dataset.

    -----------------------------------------------------------------------------"""
    if len(inputs) != 2:
        raise ValueError(
            f"'inputs' argument for {self.__repr_name__()}.run(inputs) must be a"
            f" two-element list of [start date, end date]. Got '{inputs}'"
        )

    # Build the input strings for the retriever, which uses a format like:
    # datastream::start::end, e.g., 'sgp.aosacsm.b1::20230101::20230102'
    start, end = inputs[0], inputs[1]
    input_keys = [
        f"{datastream}::{start}::{end}"
        for datastream in self.parameters.datastreams
    ]

    dataset = self.retriever.retrieve(
        input_keys,
        dataset_config=self.dataset_config,
        storage=self.storage,
        input_data_hook=self.hook_customize_input_datasets,
        **kwargs,
    )
    add_inputs_attr(dataset, input_keys)
    dataset = self.prepare_retrieved_dataset(dataset)
    dataset = self.hook_customize_dataset(dataset)
    dataset = self.quality.manage(dataset)
    dataset = self.hook_finalize_dataset(dataset)
    # HACK: Fix encoding on datetime64 variables. Use a shallow copy to retain units
    # on datetime64 variables in the pipeline (but remove with decode_cf())
    dataset = decode_cf(dataset)
    self.storage.save_data(dataset)
    with self.storage.uploadable_dir() as tmp_dir:
        self._ds = dataset
        self._tmp_dir = tmp_dir
        self.hook_plot_dataset(dataset)
    return dataset

Functions#

add_inputs_attr #

add_inputs_attr(
    dataset: xr.Dataset, inputs: List[str]
) -> None
Source code in tsdat/pipeline/pipelines.py
def add_inputs_attr(dataset: xr.Dataset, inputs: List[str]) -> None:
    # A len(list)=1 attr doesn't survive round trip, so we keep it a string in that case
    # https://github.com/pydata/xarray/issues/4798
    inputs_attr = inputs if len(inputs) != 1 else inputs[0]
    dataset.attrs["inputs"] = inputs_attr