Skip to content

ingest_pipeline

Classes:

Name Description
IngestPipeline

Classes#

IngestPipeline #

Bases: Pipeline


Pipeline class designed to read in raw, unstandardized time series data and enhance its quality and usability by converting it into a standard format, embedding metadata, applying quality checks and controls, generating reference plots, and saving the data in an accessible format so it can be used later in scientific analyses or in higher-level tsdat Pipelines.


Methods:

Name Description
get_ancillary_filepath

Returns the path to where an ancillary file should be saved so that it can be

hook_customize_dataset

hook_finalize_dataset

hook_plot_dataset

run

Attributes:

Name Type Description
ds Optional[Dataset]
tmp_dir Optional[Path]

Attributes#

ds property #
ds: Optional[Dataset]
tmp_dir property #
tmp_dir: Optional[Path]

Functions#

get_ancillary_filepath #
get_ancillary_filepath(
    title: str, extension: str = "png", **kwargs: Any
) -> Path

Returns the path to where an ancillary file should be saved so that it can be synced to the storage area automatically.

Parameters:

Name Type Description Default
title str

The title to use for the plot filename. Should only contain alphanumeric and '_' characters.

required
extension str

The file extension. Defaults to "png".

'png'

Returns:

Name Type Description
Path Path

The ancillary filepath.

Source code in tsdat/pipeline/pipelines/ingest_pipeline.py
def get_ancillary_filepath(
    self, title: str, extension: str = "png", **kwargs: Any
) -> Path:
    """Returns the path to where an ancillary file should be saved so that it can be
    synced to the storage area automatically.

    Args:
        title (str): The title to use for the plot filename. Should only contain
            alphanumeric and '_' characters.
        extension (str, optional): The file extension. Defaults to "png".

    Returns:
        Path: The ancillary filepath.
    """
    dataset = kwargs.pop("dataset", self.ds)
    root_dir = kwargs.pop("root_dir", self.tmp_dir)
    return self.storage.get_ancillary_filepath(
        title=title,
        extension=extension,
        dataset=dataset,
        root_dir=root_dir,
        **kwargs,
    )
hook_customize_dataset #
hook_customize_dataset(dataset: xr.Dataset) -> xr.Dataset

Code hook to customize the retrieved dataset prior to qc being applied.

Parameters:

Name Type Description Default
dataset Dataset

The output dataset structure returned by the retriever API.

required

Returns:

Type Description
Dataset

xr.Dataset: The customized dataset.


Source code in tsdat/pipeline/pipelines/ingest_pipeline.py
def hook_customize_dataset(self, dataset: xr.Dataset) -> xr.Dataset:
    """-----------------------------------------------------------------------------
    Code hook to customize the retrieved dataset prior to qc being applied.

    Args:
        dataset (xr.Dataset): The output dataset structure returned by the retriever
            API.

    Returns:
        xr.Dataset: The customized dataset.

    -----------------------------------------------------------------------------"""
    return dataset
hook_finalize_dataset #
hook_finalize_dataset(dataset: xr.Dataset) -> xr.Dataset

Code hook to finalize the dataset after qc is applied but before it is saved.

Parameters:

Name Type Description Default
dataset Dataset

The output dataset returned by the retriever API and modified by the hook_customize_dataset user code hook.

required

Returns:

Type Description
Dataset

xr.Dataset: The finalized dataset, ready to be saved.


Source code in tsdat/pipeline/pipelines/ingest_pipeline.py
def hook_finalize_dataset(self, dataset: xr.Dataset) -> xr.Dataset:
    """-----------------------------------------------------------------------------
    Code hook to finalize the dataset after qc is applied but before it is saved.

    Args:
        dataset (xr.Dataset): The output dataset returned by the retriever API and
            modified by the `hook_customize_dataset` user code hook.

    Returns:
        xr.Dataset: The finalized dataset, ready to be saved.

    -----------------------------------------------------------------------------"""
    return dataset
hook_plot_dataset #
hook_plot_dataset(dataset: xr.Dataset)

Code hook to create plots for the data which runs after the dataset has been saved.

Parameters:

Name Type Description Default
dataset Dataset

The dataset to plot.

required

Source code in tsdat/pipeline/pipelines/ingest_pipeline.py
def hook_plot_dataset(self, dataset: xr.Dataset):
    """-----------------------------------------------------------------------------
    Code hook to create plots for the data which runs after the dataset has been saved.

    Args:
        dataset (xr.Dataset): The dataset to plot.

    -----------------------------------------------------------------------------"""
    pass
run #
run(inputs: List[str], **kwargs: Any) -> xr.Dataset
Source code in tsdat/pipeline/pipelines/ingest_pipeline.py
def run(self, inputs: List[str], **kwargs: Any) -> xr.Dataset:
    dataset = self.retriever.retrieve(inputs, self.dataset_config)
    dataset = self.prepare_retrieved_dataset(dataset)
    add_inputs_attr(dataset, inputs)
    dataset = self.hook_customize_dataset(dataset)
    dataset = self.quality.manage(dataset)
    dataset = self.hook_finalize_dataset(dataset)
    # HACK: Fix encoding on datetime64 variables. Use a shallow copy to retain units
    # on datetime64 variables in the pipeline (but remove with decode_cf())
    dataset = decode_cf(dataset)
    self.storage.save_data(dataset)
    with self.storage.uploadable_dir() as tmp_dir:
        self._ds = dataset
        self._tmp_dir = tmp_dir
        self.hook_plot_dataset(dataset)
    return dataset

Functions#

Modules#