Skip to content

pipeline

Modules:

Name Description
model_to_dict

Classes:

Name Description
Pipeline

Classes#

Pipeline #

Bases: ParameterizedClass, ABC


Base class for tsdat data pipelines.


Methods:

Name Description
prepare_retrieved_dataset

run

Attributes:

Name Type Description
cfg_filepath Optional[Path]

The pipeline.yaml file containing the parameters used to instantiate this object

dataset_config DatasetConfig

Describes the structure and metadata of the output dataset.

quality QualityManagement

Manages the dataset quality through checks and corrections.

retriever Retriever

Retrieves data from input keys.

settings Any
storage Storage

Stores the dataset so it can be retrieved later.

triggers List[Pattern]

Regex patterns matching input keys to determine when the pipeline should run.

Attributes#

cfg_filepath class-attribute instance-attribute #
cfg_filepath: Optional[Path] = None

The pipeline.yaml file containing the parameters used to instantiate this object

dataset_config class-attribute instance-attribute #
dataset_config: DatasetConfig = Field(alias='dataset')

Describes the structure and metadata of the output dataset.

quality instance-attribute #
quality: QualityManagement

Manages the dataset quality through checks and corrections.

retriever instance-attribute #
retriever: Retriever

Retrieves data from input keys.

settings class-attribute instance-attribute #
settings: Any = None
storage instance-attribute #
storage: Storage

Stores the dataset so it can be retrieved later.

triggers class-attribute instance-attribute #
triggers: List[Pattern] = []

Regex patterns matching input keys to determine when the pipeline should run.

Functions#

prepare_retrieved_dataset #
prepare_retrieved_dataset(
    dataset: xr.Dataset,
) -> xr.Dataset

Modifies the retrieved dataset by dropping variables not declared in the DatasetConfig, adding static variables, initializing non-retrieved variables, and importing global and variable-level attributes from the DatasetConfig.

Parameters:

Name Type Description Default
dataset Dataset

The retrieved dataset.

required

Returns:

Type Description
Dataset

xr.Dataset: The dataset with structure and metadata matching the

Dataset

DatasetConfig.


Source code in tsdat/pipeline/base/pipeline.py
def prepare_retrieved_dataset(self, dataset: xr.Dataset) -> xr.Dataset:
    """-----------------------------------------------------------------------------
    Modifies the retrieved dataset by dropping variables not declared in the
    DatasetConfig, adding static variables, initializing non-retrieved variables,
    and importing global and variable-level attributes from the DatasetConfig.

    Args:
        dataset (xr.Dataset): The retrieved dataset.

    Returns:
        xr.Dataset: The dataset with structure and metadata matching the
        DatasetConfig.

    -----------------------------------------------------------------------------"""
    output_vars = list(self.dataset_config.coords) + list(
        self.dataset_config.data_vars
    )
    retrieved_variables = cast(List[str], list(dataset.variables))
    vars_to_drop = [ret for ret in retrieved_variables if ret not in output_vars]
    vars_to_add = [out for out in output_vars if out not in retrieved_variables]

    dataset = dataset.drop_vars(vars_to_drop)
    dataset = self._add_dataset_dtypes(dataset)
    dataset = self._add_dataset_variables(dataset, vars_to_add)
    dataset = self._add_dataset_attrs(dataset, output_vars)
    # TODO: reorder dataset coords / data vars to match the order in the config file
    return dataset
run abstractmethod #
run(inputs: List[str], **kwargs: Any) -> Any

Runs the data pipeline on the provided inputs.

Parameters:

Name Type Description Default
inputs List[str]

A list of input keys that the pipeline's Retriever class can use to load data into the pipeline.

required

Returns:

Type Description
Any

xr.Dataset: The processed dataset.


Source code in tsdat/pipeline/base/pipeline.py
@abstractmethod
def run(self, inputs: List[str], **kwargs: Any) -> Any:
    """-----------------------------------------------------------------------------
    Runs the data pipeline on the provided inputs.

    Args:
        inputs (List[str]): A list of input keys that the pipeline's Retriever class
            can use to load data into the pipeline.

    Returns:
        xr.Dataset: The processed dataset.

    -----------------------------------------------------------------------------"""
    ...

Modules#