Transformation / VAP Pipelines¶
Transformation pipelines, also referred to as Value-Added Product (VAP) pipelines, are tsdat pipelines that use data from several standardized input sources and combine them in ways that add value to the data.
Warning
Tsdat support for transformation pipelines is currently in a beta phase, meaning that new features are being actively developed and APIs involved may be relatively unstable as new use cases are added and requirements are discovered. We greatly appreciate any feedback on this new capability.
Tsdat transformation pipelines are configured in almost exactly the same way as the ingestion pipelines you may already be used to. In fact, the tsdat TransformationPipeline class inherits all of its methods and attributes from the IngestPipeline class and only overrides the retriever code to ensure that input data are retrieved from the storage area.
Only the pipeline.yaml and retriever.yaml configuration files have any differences from their counterparts for a tsdat ingest. These are shown below.
Installation¶
One additional library is needed in order to use the new transformation methods: adi_py.
This can be installed via conda install -c arm-doe adi_py
.
This library contains C code and cython bindings from the Atmospheric Radiation Measurement (ARM) Program’s ARM Data Integrator (ADI)
transformation library, which provides one critical feature over the transformation methods built-in to xarray: handling data quality
as part of the transformation process. The adi_py
library reads data quality flags from qc_* variables and will opt to not use
points flagged as bad in transformations such as interpolation and averaging. Additionally, the library outputs a qc variable for each
transformed variable describing the quality of the transformation based on the quality of any input qc flags found in the input data.
Pipeline Configuration File¶
The pipeline configuration file for transformation pipelines is almost identical to its ingest pipeline counterpart. There are only a few differences:
The classname should point to tsdat.TransformationPipeline, or a class derived from it.
The parameters for the class should include a
datastreams
entry mapping to a list of input datastreams that are needed as input to the pipeline.The trigger should be empty since transformation pipelines are currently run manually.
An example transformation pipeline pipeline.yaml
file is shown below. Highlighted lines show notable differences from a typical pipeline configuration file for an IngestPipeline.
classname: tsdat.TransformationPipeline
parameters:
datastreams:
- humboldt.lidar.b0
- humboldt.met.b0
triggers: {}
retriever:
path: pipelines/example_pipeline/config/retriever.yaml
dataset:
path: shared/config/dataset.yaml
quality:
path: shared/config/default-quality.yaml
storage:
path: shared/config/storage.yaml
Retriever Configuration File¶
The retriever configuration file for transformation pipelines is also similar to its ingest pipeline counterpart, but there are some notable differences, mostly pertaining to how data from various input sources should be combined. These are noted below:
The classname should point to tsdat.StorageRetriever, or a class derived from it. This class requires additional transformation_parameters to be specified.
The tsdat.transform module was added, including methods for creating time coordinate grids and various transformation methods:
NearestNeighbor
,BinAverage
,Interpolate
, andAutomatic
.
An example retriever.yaml
file is shown below. Highlighted lines show notable differences from a typical retriever configuration file for an IngestPipeline.
classname: tsdat.StorageRetriever
parameters:
# Set coordinate system defaults for alignment, range, width
transformation_parameters:
# Alignment is one of LEFT, RIGHT, CENTER and indicates where the output point should
# lie in relation to the reported timestamp range.
alignment:
time: CENTER
# Range is how far to look for the previous/next points when transforming over an
# interval. E.g., for nearest neighbor, this is how far to look for the closest available
# point. If nothing is close enough, the output corresponding with that timestamp will be
# NaN/missing
range:
time: 900s
# Width is the size of the output dimension bins. E.g. width=300s with center alignment
# would mean that each timestamp in the output represents the period from 150s before and 150s
# after the reported timestamp.
width:
time: 300s
coords:
time:
name: NA # not retrieved from input; this will be autogenerated instead
data_converters:
- classname: tsdat.transform.CreateTimeGrid
interval: 5min
data_vars:
temperature:
.*met\.b0.*:
name: temp
data_converters:
- classname: tsdat.io.converters.UnitsConverter
input_units: degF
- classname: tsdat.transform.NearestNeighbor
coord: time
humidity:
.*met\.b0.*:
name: rh
data_converters:
- classname: tsdat.transform.NearestNeighbor
coord: time
Pipeline Code Hooks¶
The TransformationPipeline class provides one additional hook that is currently not available in the IngestPipeline class: the hook_customize_input_datasets hook. This code hook allows you to customize input datasets/files before they are merged onto the same coordinate grid.
def hook_customize_input_datasets(
self, input_datasets: Dict[str, xr.Dataset], **kwargs: Any
) -> Dict[str, xr.Dataset]:
"""-----------------------------------------------------------------------------
Code hook to customize any input datasets prior to datastreams being combined
and data converters being run.
Args:
input_datasets (Dict[str, xr.Dataset]): The dictionary of input key (str) to
input dataset. Note that for transformation pipelines, input keys !=
input filename, rather each input key is a combination of the datastream
and date range used to pull the input data from the storage retriever.
Returns:
Dict[str, xr.Dataset]: The customized input datasets.
-----------------------------------------------------------------------------"""
return input_datasets