Skip to content

transformation_pipeline

Modules:

Name Description
decode_cf

Classes:

Name Description
TransformationPipeline

Classes#

TransformationPipeline #

Bases: IngestPipeline


Pipeline class designed to read in standardized time series data and enhance its quality and usability by combining multiple sources of data, using higher-level processing techniques, etc.


Classes:

Name Description
Parameters

Methods:

Name Description
hook_customize_input_datasets

run

Attributes:

Name Type Description
parameters Parameters
retriever StorageRetriever

Attributes#

parameters instance-attribute #
parameters: Parameters
retriever instance-attribute #
retriever: StorageRetriever

Classes#

Parameters #

Bases: BaseModel

Attributes:

Name Type Description
datastreams List[str]

A list of datastreams that the pipeline should be configured to run for.

Attributes#
datastreams instance-attribute #
datastreams: List[str]

A list of datastreams that the pipeline should be configured to run for. Datastreams should include the location and data level information.

Functions#

hook_customize_input_datasets #
hook_customize_input_datasets(
    input_datasets: Dict[str, xr.Dataset], **kwargs: Any
) -> Dict[str, xr.Dataset]

Code hook to customize any input datasets prior to datastreams being combined and data converters being run.

Parameters:

Name Type Description Default
input_datasets Dict[str, Dataset]

The dictionary of input key (str) to input dataset. Note that for transformation pipelines, input keys != input filename, rather each input key is a combination of the datastream and date range used to pull the input data from the storage retriever.

required

Returns:

Type Description
Dict[str, Dataset]

Dict[str, xr.Dataset]: The customized input datasets.


Source code in tsdat/pipeline/pipelines/transformation_pipeline.py
def hook_customize_input_datasets(
    self, input_datasets: Dict[str, xr.Dataset], **kwargs: Any
) -> Dict[str, xr.Dataset]:
    """-----------------------------------------------------------------------------
    Code hook to customize any input datasets prior to datastreams being combined
    and data converters being run.

    Args:
        input_datasets (Dict[str, xr.Dataset]): The dictionary of input key (str) to
            input dataset. Note that for transformation pipelines, input keys !=
            input filename, rather each input key is a combination of the datastream
            and date range used to pull the input data from the storage retriever.

    Returns:
        Dict[str, xr.Dataset]: The customized input datasets.

    -----------------------------------------------------------------------------"""
    return input_datasets
run #
run(inputs: List[str], **kwargs: Any) -> xr.Dataset

Runs the data pipeline on the provided inputs.

Parameters:

Name Type Description Default
inputs List[str]

A 2-element list of start-date, end-date that the pipeline should process.

required

Returns:

Type Description
Dataset

xr.Dataset: The processed dataset.


Source code in tsdat/pipeline/pipelines/transformation_pipeline.py
def run(self, inputs: List[str], **kwargs: Any) -> xr.Dataset:
    """-----------------------------------------------------------------------------
    Runs the data pipeline on the provided inputs.

    Args:
        inputs (List[str]): A 2-element list of start-date, end-date that the
            pipeline should process.

    Returns:
        xr.Dataset: The processed dataset.

    -----------------------------------------------------------------------------"""
    if len(inputs) != 2:
        raise ValueError(
            f"'inputs' argument for {self.__repr_name__()}.run(inputs) must be a"
            f" two-element list of [start date, end date]. Got '{inputs}'"
        )

    # Build the input strings for the retriever, which uses a format like:
    # datastream::start::end, e.g., 'sgp.aosacsm.b1::20230101::20230102'
    start, end = inputs[0], inputs[1]
    input_keys = [
        f"{datastream}::{start}::{end}"
        for datastream in self.parameters.datastreams
    ]

    dataset = self.retriever.retrieve(
        input_keys,
        dataset_config=self.dataset_config,
        storage=self.storage,
        input_data_hook=self.hook_customize_input_datasets,
        **kwargs,
    )
    add_inputs_attr(dataset, input_keys)
    dataset = self.prepare_retrieved_dataset(dataset)
    dataset = self.hook_customize_dataset(dataset)
    dataset = self.quality.manage(dataset)
    dataset = self.hook_finalize_dataset(dataset)
    # HACK: Fix encoding on datetime64 variables. Use a shallow copy to retain units
    # on datetime64 variables in the pipeline (but remove with decode_cf())
    dataset = decode_cf(dataset)
    self.storage.save_data(dataset)
    with self.storage.uploadable_dir() as tmp_dir:
        self._ds = dataset
        self._tmp_dir = tmp_dir
        self.hook_plot_dataset(dataset)
    return dataset

Functions#

Modules#